diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java index af259aaa374..a31b573daa7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConfigStorage.java @@ -27,11 +27,11 @@ import org.apache.rocketmq.common.config.ConfigHelper; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DirectSlice; import org.rocksdb.ReadOptions; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; @@ -112,10 +112,16 @@ public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) { readOptions.setTotalOrderSeek(true); readOptions.setTailing(false); readOptions.setAutoPrefixMode(true); - readOptions.setIterateLowerBound(new DirectSlice(beginKey)); - readOptions.setIterateUpperBound(new DirectSlice(endKey)); + // Use DirectSlice till the follow issue is fixed: + // https://github.com/facebook/rocksdb/issues/13098 + // + // readOptions.setIterateUpperBound(new DirectSlice(endKey)); + byte[] buf = new byte[endKey.remaining()]; + endKey.slice().get(buf); + readOptions.setIterateUpperBound(new Slice(buf)); + RocksIterator iterator = db.newIterator(defaultCFHandle, readOptions); - iterator.seekToFirst(); + iterator.seek(beginKey.slice()); return iterator; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 5b0885c491a..2c5d3677d88 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -390,10 +390,12 @@ public void commitPullOffset(String clientHost, String group, String topic, int ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId); ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8); + valueBuf.writeLong(offset); try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); } catch (RocksDBException e) { LOG.error("Failed to commit pull offset. group={}, topic={}, queueId={}, offset={}", group, topic, queueId, offset); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java index 8da6f9d2bc5..f535fa195a9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java @@ -74,6 +74,7 @@ private boolean loadSubscriptions() { if (null != subscriptionGroupConfig) { super.updateSubscriptionGroupConfigWithoutPersist(subscriptionGroupConfig); } + iterator.next(); } } finally { beginKey.release(); @@ -163,6 +164,7 @@ protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName writeBatch.delete(ConfigHelper.readBytes(keyBuf)); long stateMachineVersion = brokerController.getMessageStore().getStateMachineVersion(); ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); + configStorage.write(writeBatch); } catch (RocksDBException e) { log.error("Failed to remove subscription group config by group-name={}", groupName, e); } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java new file mode 100644 index 00000000000..d7f46855e1a --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2Test.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.config.v2; + +import java.io.File; +import java.io.IOException; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.MixAll; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class ConsumerOffsetManagerV2Test { + + private ConfigStorage configStorage; + + private ConsumerOffsetManagerV2 consumerOffsetManagerV2; + + @Mock + private BrokerController controller; + + @Rule + public TemporaryFolder tf = new TemporaryFolder(); + + @After + public void cleanUp() { + if (null != configStorage) { + configStorage.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + BrokerConfig brokerConfig = new BrokerConfig(); + Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); + + File configStoreDir = tf.newFolder(); + configStorage = new ConfigStorage(configStoreDir.getAbsolutePath()); + configStorage.start(); + consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage); + } + + /** + * Verify consumer offset can survive restarts + */ + @Test + public void testCommitOffset_Standard() { + Assert.assertTrue(consumerOffsetManagerV2.load()); + + String clientHost = "localhost"; + String topic = "T1"; + String group = "G0"; + int queueId = 1; + long queueOffset = 100; + consumerOffsetManagerV2.commitOffset(clientHost, group, topic, queueId, queueOffset); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + + configStorage.shutdown(); + consumerOffsetManagerV2.getOffsetTable().clear(); + Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + + configStorage.start(); + consumerOffsetManagerV2.load(); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + } + + /** + * Verify commit offset can survive config store restart + */ + @Test + public void testCommitOffset_LMQ() { + Assert.assertTrue(consumerOffsetManagerV2.load()); + + String clientHost = "localhost"; + String topic = MixAll.LMQ_PREFIX + "T1"; + String group = "G0"; + int queueId = 1; + long queueOffset = 100; + consumerOffsetManagerV2.commitOffset(clientHost, group, topic, queueId, queueOffset); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + + configStorage.shutdown(); + + configStorage.start(); + consumerOffsetManagerV2.load(); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + } + + + /** + * Verify commit offset can survive config store restart + */ + @Test + public void testCommitPullOffset_LMQ() { + Assert.assertTrue(consumerOffsetManagerV2.load()); + + String clientHost = "localhost"; + String topic = MixAll.LMQ_PREFIX + "T1"; + String group = "G0"; + int queueId = 1; + long queueOffset = 100; + consumerOffsetManagerV2.commitPullOffset(clientHost, group, topic, queueId, queueOffset); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryPullOffset(group, topic, queueId)); + + configStorage.shutdown(); + + configStorage.start(); + consumerOffsetManagerV2.load(); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryPullOffset(group, topic, queueId)); + } + + /** + * Verify commit offset can survive config store restart + */ + @Test + public void testRemoveByTopicAtGroup() { + Assert.assertTrue(consumerOffsetManagerV2.load()); + + String clientHost = "localhost"; + String topic = MixAll.LMQ_PREFIX + "T1"; + String topic2 = MixAll.LMQ_PREFIX + "T2"; + String group = "G0"; + int queueId = 1; + long queueOffset = 100; + consumerOffsetManagerV2.commitOffset(clientHost, group, topic, queueId, queueOffset); + consumerOffsetManagerV2.commitOffset(clientHost, group, topic2, queueId, queueOffset); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); + + consumerOffsetManagerV2.removeConsumerOffset(topic + ConsumerOffsetManager.TOPIC_GROUP_SEPARATOR + group); + Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); + + configStorage.shutdown(); + configStorage.start(); + consumerOffsetManagerV2.load(); + Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); + } + + /** + * Verify commit offset can survive config store restart + */ + @Test + public void testRemoveByGroup() { + Assert.assertTrue(consumerOffsetManagerV2.load()); + + String clientHost = "localhost"; + String topic = MixAll.LMQ_PREFIX + "T1"; + String topic2 = MixAll.LMQ_PREFIX + "T2"; + String group = "G0"; + int queueId = 1; + long queueOffset = 100; + consumerOffsetManagerV2.commitOffset(clientHost, group, topic, queueId, queueOffset); + consumerOffsetManagerV2.commitOffset(clientHost, group, topic2, queueId, queueOffset); + Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + consumerOffsetManagerV2.removeOffset(group); + Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); + + configStorage.shutdown(); + configStorage.start(); + consumerOffsetManagerV2.load(); + Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId)); + Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic2, queueId)); + } + +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java new file mode 100644 index 00000000000..6d436a7c4db --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2Test.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.config.v2; + +import java.io.File; +import java.io.IOException; +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicy; +import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.MessageStore; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class SubscriptionGroupManagerV2Test { + private ConfigStorage configStorage; + + private SubscriptionGroupManagerV2 subscriptionGroupManagerV2; + + @Mock + private BrokerController controller; + + @Mock + private MessageStore messageStore; + + @Rule + public TemporaryFolder tf = new TemporaryFolder(); + + @After + public void cleanUp() { + if (null != configStorage) { + configStorage.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + BrokerConfig brokerConfig = new BrokerConfig(); + brokerConfig.setAutoCreateSubscriptionGroup(false); + Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); + + Mockito.doReturn(messageStore).when(controller).getMessageStore(); + Mockito.doReturn(1L).when(messageStore).getStateMachineVersion(); + + File configStoreDir = tf.newFolder(); + configStorage = new ConfigStorage(configStoreDir.getAbsolutePath()); + configStorage.start(); + subscriptionGroupManagerV2 = new SubscriptionGroupManagerV2(controller, configStorage); + } + + + @Test + public void testUpdateSubscriptionGroupConfig() { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName("G1"); + subscriptionGroupConfig.setConsumeEnable(true); + subscriptionGroupConfig.setRetryMaxTimes(16); + subscriptionGroupConfig.setGroupSysFlag(1); + GroupRetryPolicy retryPolicy = new GroupRetryPolicy(); + retryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL); + subscriptionGroupConfig.setGroupRetryPolicy(retryPolicy); + subscriptionGroupConfig.setBrokerId(1); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + subscriptionGroupConfig.setConsumeMessageOrderly(true); + subscriptionGroupConfig.setConsumeTimeoutMinute(30); + subscriptionGroupConfig.setConsumeFromMinEnable(true); + subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(1); + subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(true); + subscriptionGroupManagerV2.updateSubscriptionGroupConfig(subscriptionGroupConfig); + + SubscriptionGroupConfig found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); + Assert.assertEquals(subscriptionGroupConfig, found); + + subscriptionGroupManagerV2.getSubscriptionGroupTable().clear(); + configStorage.shutdown(); + configStorage.start(); + subscriptionGroupManagerV2.load(); + found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); + Assert.assertEquals(subscriptionGroupConfig, found); + } + + + @Test + public void testDeleteSubscriptionGroupConfig() { + SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig(); + subscriptionGroupConfig.setGroupName("G1"); + subscriptionGroupConfig.setConsumeEnable(true); + subscriptionGroupConfig.setRetryMaxTimes(16); + subscriptionGroupConfig.setGroupSysFlag(1); + GroupRetryPolicy retryPolicy = new GroupRetryPolicy(); + retryPolicy.setType(GroupRetryPolicyType.EXPONENTIAL); + subscriptionGroupConfig.setGroupRetryPolicy(retryPolicy); + subscriptionGroupConfig.setBrokerId(1); + subscriptionGroupConfig.setConsumeBroadcastEnable(true); + subscriptionGroupConfig.setConsumeMessageOrderly(true); + subscriptionGroupConfig.setConsumeTimeoutMinute(30); + subscriptionGroupConfig.setConsumeFromMinEnable(true); + subscriptionGroupConfig.setWhichBrokerWhenConsumeSlowly(1); + subscriptionGroupConfig.setNotifyConsumerIdsChangedEnable(true); + subscriptionGroupManagerV2.updateSubscriptionGroupConfig(subscriptionGroupConfig); + + SubscriptionGroupConfig found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); + Assert.assertEquals(subscriptionGroupConfig, found); + subscriptionGroupManagerV2.removeSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); + + found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); + Assert.assertNull(found); + + configStorage.shutdown(); + configStorage.start(); + subscriptionGroupManagerV2.load(); + found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName()); + Assert.assertNull(found); + } + +} diff --git a/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java new file mode 100644 index 00000000000..92c936b110a --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2Test.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.broker.config.v2; + +import java.io.File; +import java.io.IOException; + +import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.common.BrokerConfig; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + + +@RunWith(value = MockitoJUnitRunner.class) +public class TopicConfigManagerV2Test { + + private ConfigStorage configStorage; + + private TopicConfigManagerV2 topicConfigManagerV2; + + @Mock + private BrokerController controller; + + @Rule + public TemporaryFolder tf = new TemporaryFolder(); + + @After + public void cleanUp() { + if (null != configStorage) { + configStorage.shutdown(); + } + } + + @Before + public void setUp() throws IOException { + BrokerConfig brokerConfig = new BrokerConfig(); + Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig(); + + MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); + Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig(); + + File configStoreDir = tf.newFolder(); + configStorage = new ConfigStorage(configStoreDir.getAbsolutePath()); + configStorage.start(); + topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage); + } + + @Test + public void testUpdateTopicConfig() { + TopicConfig topicConfig = new TopicConfig(); + String topicName = "T1"; + topicConfig.setTopicName(topicName); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(8); + topicConfig.setWriteQueueNums(4); + topicConfig.setOrder(true); + topicConfig.setTopicSysFlag(4); + topicConfigManagerV2.updateTopicConfig(topicConfig); + + Assert.assertTrue(configStorage.shutdown()); + + topicConfigManagerV2.getTopicConfigTable().clear(); + + Assert.assertTrue(configStorage.start()); + Assert.assertTrue(topicConfigManagerV2.load()); + + TopicConfig loaded = topicConfigManagerV2.selectTopicConfig(topicName); + Assert.assertNotNull(loaded); + Assert.assertEquals(topicName, loaded.getTopicName()); + Assert.assertEquals(6, loaded.getPerm()); + Assert.assertEquals(8, loaded.getReadQueueNums()); + Assert.assertEquals(4, loaded.getWriteQueueNums()); + Assert.assertTrue(loaded.isOrder()); + Assert.assertEquals(4, loaded.getTopicSysFlag()); + + Assert.assertTrue(topicConfigManagerV2.containsTopic(topicName)); + } + + @Test + public void testRemoveTopicConfig() { + TopicConfig topicConfig = new TopicConfig(); + String topicName = "T1"; + topicConfig.setTopicName(topicName); + topicConfig.setPerm(6); + topicConfig.setReadQueueNums(8); + topicConfig.setWriteQueueNums(4); + topicConfig.setOrder(true); + topicConfig.setTopicSysFlag(4); + topicConfigManagerV2.updateTopicConfig(topicConfig); + topicConfigManagerV2.removeTopicConfig(topicName); + Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName)); + Assert.assertTrue(configStorage.shutdown()); + + Assert.assertTrue(configStorage.start()); + Assert.assertTrue(topicConfigManagerV2.load()); + Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName)); + } +}