Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7547] Let consumer be aware of message queue assignment change #7548

Merged
merged 7 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/bazel.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ jobs:
- name: Build
run: bazel build --config=remote //...
- name: Run Tests
run: bazel test --config=remote --nocache_test_results //...
run: bazel test --config=remote //...
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
*/
private MessageListener messageListener;

/**
* Listener to call if message queue assignment is changed.
*/
private MessageQueueListener messageQueueListener;

/**
* Offset Storage
*/
Expand Down Expand Up @@ -987,4 +992,12 @@ public boolean isClientRebalance() {
public void setClientRebalance(boolean clientRebalance) {
this.clientRebalance = clientRebalance;
}

public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}

public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
this.messageQueueListener = messageQueueListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,22 @@
*/
public interface MQConsumer extends MQAdmin {
/**
* If consuming failure,message will be send back to the brokers,and delay consuming some time
* If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
* interval specified in delay level.
*/
@Deprecated
void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;

/**
* If consuming failure,message will be send back to the broker,and delay consuming some time
* If consuming of messages failed, they will be sent back to the brokers for another delivery attempt after
* interval specified in delay level.
*/
void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

/**
* Fetch message queues from consumer cache according to the topic
* Fetch message queues from consumer cache pertaining to the given topic.
*
* @param topic message topic
* @return queue set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ public interface MessageQueueListener {
/**
* @param topic message topic
* @param mqAll all queues in this message topic
* @param mqDivided collection of queues,assigned to the current consumer
* @param mqAssigned collection of queues, assigned to the current consumer
*/
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
final Set<MessageQueue> mqDivided);
void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqAssigned);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PopCallback;
import org.apache.rocketmq.client.consumer.PopResult;
Expand Down Expand Up @@ -132,7 +133,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long queueMaxSpanFlowControlTimes = 0;

//10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
private int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
private final int[] popDelayLevel = new int[] {10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};

private static final int MAX_POP_INVISIBLE_TIME = 300000;
private static final int MIN_POP_INVISIBLE_TIME = 5000;
Expand Down Expand Up @@ -1553,4 +1554,11 @@ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenExcept
int[] getPopDelayLevel() {
return popDelayLevel;
}

public MessageQueueListener getMessageQueueListener() {
if (null == defaultMQPushConsumer) {
return null;
}
return defaultMQPushConsumer.getMessageQueueListener();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
Expand Down Expand Up @@ -52,7 +53,7 @@ public RebalancePushImpl(String consumerGroup, MessageModel messageModel,

@Override
public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
/**
/*
* When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
Expand Down Expand Up @@ -82,6 +83,11 @@ public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<Messa

// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLockV2(true);

MessageQueueListener messageQueueListener = defaultMQPushConsumerImpl.getMessageQueueListener();
if (null != messageQueueListener) {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public static LocalRemotingCommand createRequestCommand(int code, CommandCustomH
cmd.writeCustomHeader(customHeader);
cmd.setExtFields(new HashMap<>());
setCmdVersion(cmd);
cmd.makeCustomHeaderToNet();
return cmd;
}

Expand Down
1 change: 1 addition & 0 deletions remoting/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ java_library(
"@maven//:io_opentelemetry_opentelemetry_sdk_metrics",
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:org_apache_commons_commons_lang3",
"@maven//:org_jetbrains_annotations",
],
resources = glob(["src/test/resources/certs/*.pem"]) + glob(["src/test/resources/certs/*.key"])
)
Expand Down
1 change: 1 addition & 0 deletions store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ GenTestRules(
"src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest",
"src/test/java/org/apache/rocketmq/store/MappedFileQueueTest",
"src/test/java/org/apache/rocketmq/store/queue/BatchConsumeMessageTest",
"src/test/java/org/apache/rocketmq/store/dledger/MixCommitlogTest",
],
test_files = glob(["src/test/java/**/*Test.java"]),
deps = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.rocketmq.test.client.consumer.balance;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.test.base.BaseConf;
Expand Down Expand Up @@ -112,4 +114,19 @@ public void test3ConsumerAndCrashOne() {
consumer2.getListener().getAllUndupMsgBody()).size());
assertThat(balance).isEqualTo(true);
}

@Test
public void testMessageQueueListener() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

RMQNormalConsumer consumer1 = getConsumer(NAMESRV_ADDR, topic, "*", new RMQNormalListener());
// Register message queue listener
consumer1.getConsumer().setMessageQueueListener((topic, mqAll, mqAssigned) -> latch.countDown());

// Without message queue listener
RMQNormalConsumer consumer2 = getConsumer(NAMESRV_ADDR, consumer1.getConsumerGroup(), topic,
"*", new RMQNormalListener());

Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
}
1 change: 1 addition & 0 deletions tieredstore/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ java_library(
"@maven//:org_apache_tomcat_annotations_api",
"@maven//:com_alibaba_fastjson",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
"@maven//:commons_collections_commons_collections",
],
)

Expand Down
1 change: 1 addition & 0 deletions tools/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ java_library(
"@maven//:commons_collections_commons_collections",
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
"@maven//:io_github_aliyunmq_rocketmq_logback_classic",
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
],
)

Expand Down
Loading