Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] There're still bugs during unload after #404 #493

Closed
BewareMyPower opened this issue May 12, 2021 · 1 comment · Fixed by #495
Closed

[BUG] There're still bugs during unload after #404 #493

BewareMyPower opened this issue May 12, 2021 · 1 comment · Fixed by #495
Assignees
Labels

Comments

@BewareMyPower
Copy link
Collaborator

Describe the bug
#404 tried to fix the bundle unload bug by removing topics that are not owned by current broker from static caches in namespace bundle ownership listener. But the unload tests are still easy to fail because somehow the KafkaTopicConsumerManager (let's say TCM in short) still cached some null values. I'll give the detail analysis at the last section.

To Reproduce
Run unload tests for several times:

mvn test -pl tests -Dtest='DistributedClusterTest#testMutiBrokerUnloadReload'

Expected behavior
The test should be stable to pass.

Additional context
This section gives the related logs for one failure. The test failed because kafkaConsumeCommitMessage timed out.

First, we can grep these logs:

while (i < numMessages) {
if (log.isDebugEnabled()) {
log.debug("kConsumer {} start poll message: {}",
kConsumer.getTopic() + kConsumer.getConsumerGroup(), i);
}

17:42:54.275 [TestNG-method=testMutiBrokerUnloadReload-1:io.streamnative.pulsar.handlers.kop.DistributedClusterTest@230] DEBUG io.streamnative.pulsar.handlers.kop.DistributedClusterTest - kConsumer kopMutiBrokerUnloadReload10consumer-group-1 start poll message: 30
...
17:43:12.291 [TestNG-method=testMutiBrokerUnloadReload-1:io.streamnative.pulsar.handlers.kop.DistributedClusterTest@230] DEBUG io.streamnative.pulsar.handlers.kop.DistributedClusterTest - kConsumer kopMutiBrokerUnloadReload10consumer-group-1 start poll message: 30

Then from client's logs:

17:42:54.175 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 4 for partition kopMutiBrokerUnloadReload10-2 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
17:42:54.175 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 2 for partition kopMutiBrokerUnloadReload10-7 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
17:42:54.176 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 2 for partition kopMutiBrokerUnloadReload10-6 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
17:42:54.176 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 12 for partition kopMutiBrokerUnloadReload10-3 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)

We can see partition 2,4,6,7 receive NOT_LEADER_FOR_PARTITION error, which is caused by null TCM when KoP handled FETCH requests. The above four lines repeated for many times until the tests failed. (see the timestamp)

17:43:12.644 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 4 for partition kopMutiBrokerUnloadReload10-2 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
17:43:12.644 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 2 for partition kopMutiBrokerUnloadReload10-7 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
17:43:12.644 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 2 for partition kopMutiBrokerUnloadReload10-6 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)
17:43:12.644 [TestNG-method=testMutiBrokerUnloadReload-1:org.apache.kafka.clients.consumer.internals.Fetcher$1@227] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=consumer-group-1] Fetch READ_UNCOMMITTED at offset 12 for partition kopMutiBrokerUnloadReload10-3 returned fetch data (error=NOT_LEADER_FOR_PARTITION, highWaterMark=-1, lastStableOffset = -1, logStartOffset = -1, abortedTransactions = null, recordsSizeInBytes=0)

From broker's logs, we can see at last, when we close TCMs, NPE happened.

17:43:14.826 [pulsar-io-20-24:io.streamnative.pulsar.handlers.kop.KafkaTopicManager@335] ERROR io.streamnative.pulsar.handlers.kop.KafkaTopicManager - [[id: 0xb3b718d7, L:/127.0.0.1:15010 ! R:/127.0.0.1:57836]] Failed to close KafkaTopicManager. exception:
java.lang.NullPointerException: null
at io.streamnative.pulsar.handlers.kop.KafkaTopicManager.close(KafkaTopicManager.java:313) [pulsar-protocol-handler-kafka-2.8.0-SNAPSHOT.jar:?]

The NPE logs repeated for 8 times.

See

for (CompletableFuture<KafkaTopicConsumerManager> manager : consumerTopicManagers.values()) {
manager.get().close();
}

It proves that the consumerTopicManagers has many null completed futures.

@BewareMyPower
Copy link
Collaborator Author

I've tested for multiple times, the result is

XOOOO OOXOO OOXOO OOOOO

O is passed, X is failed.

jiazhai pushed a commit that referenced this issue May 14, 2021
Fixes #493

This bug was introduced by #473. In `MessageFetchContext#handleFetch`, when the `KafkaTopicConsumerManager`'s future is completed with null, we should remove the future from `KafkaTopicManager#consumerTopicManagers`.

In addition, this PR adds some refactors for `consumerTopicManagers`:
1. Don't use getter to expose this field, use methods to operate it instead.
2. Check null for completed future before close `KafkaTopicConsumerManager`.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant