Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7740] Optimize LocalFileOffsetStore #7745

Merged
merged 3 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,20 @@ public long readOffset(final MessageQueue mq, final ReadOffsetType type) {

@Override
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
if (null == mqs || mqs.isEmpty()) {
return;
}
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper = readLocalOffset();
} catch (MQClientException e) {
log.error("readLocalOffset exception", e);
return;
}

OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
if (offsetSerializeWrapper == null) {
offsetSerializeWrapper = new OffsetSerializeWrapper();
}
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
Expand All @@ -154,11 +164,40 @@ public void persistAll(Set<MessageQueue> mqs) {

@Override
public void persist(MessageQueue mq) {
if (mq == null) {
return;
}
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null) {
OffsetSerializeWrapper offsetSerializeWrapper = null;
try {
offsetSerializeWrapper = readLocalOffset();
} catch (MQClientException e) {
log.error("readLocalOffset exception", e);
return;
}
if (offsetSerializeWrapper == null) {
offsetSerializeWrapper = new OffsetSerializeWrapper();
}
offsetSerializeWrapper.getOffsetTable().put(mq, offset);
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null) {
try {
MixAll.string2File(jsonString, this.storePath);
} catch (IOException e) {
log.error("persist consumer offset exception, " + this.storePath, e);
}
}
}
}

@Override
public void removeOffset(MessageQueue mq) {

if (mq != null) {
this.offsetTable.remove(mq);
log.info("remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", this.groupName, mq,
offsetTable.size());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer.store;

import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -85,4 +86,48 @@ public void testCloneOffset() throws Exception {
assertThat(cloneOffsetTable.size()).isEqualTo(1);
assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024);
}

@Test
public void testPersist() throws Exception {
OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);

MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
offsetStore.updateOffset(messageQueue0, 1024, false);
offsetStore.persist(messageQueue0);
assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);

MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
}

@Test
public void testPersistAll() throws Exception {
OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);

MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
offsetStore.updateOffset(messageQueue0, 1024, false);
offsetStore.persistAll(new HashSet<MessageQueue>(Collections.singletonList(messageQueue0)));
assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);

MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2);
offsetStore.updateOffset(messageQueue1, 1025, false);
offsetStore.updateOffset(messageQueue2, 1026, false);
offsetStore.persistAll(new HashSet<MessageQueue>(Arrays.asList(messageQueue1, messageQueue2)));

assertThat(offsetStore.readOffset(messageQueue0, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
assertThat(offsetStore.readOffset(messageQueue1, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
assertThat(offsetStore.readOffset(messageQueue2, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026);
}

@Test
public void testRemoveOffset() throws Exception {
OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);
MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
offsetStore.updateOffset(messageQueue, 1024, false);
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);

offsetStore.removeOffset(messageQueue);
assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
}
}