Skip to content

Commit

Permalink
[fix][client] Fix consumer can't consume resent chunked messages (#21070
Browse files Browse the repository at this point in the history
)

### Motivation

Current, when the producer resend the chunked message like this:
- M1: UUID: 0, ChunkID: 0
- M2: UUID: 0, ChunkID: 0 // Resend the first chunk
- M3: UUID: 0, ChunkID: 1

When the consumer received the M2, it will find that it's already tracking the UUID:0 chunked messages, and will then discard the message M1 and M2. This will lead to unable to consume the whole chunked message even though it's already persisted in the Pulsar topic.

Here is the code logic:
https://github.com/apache/pulsar/blob/44a055b8a55078bcf93f4904991598541aa6c1ee/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1436-L1482

The bug can be easily reproduced using the testcase `testResendChunkMessages` introduced by this PR.

### Modifications

- When receiving the new duplicated first chunk of a chunked message, the consumer discard the current chunked message context and create a new context to track the following messages. For the case mentioned in Motivation, the M1 will be released and the consumer will assemble M2 and M3 as the chunked message.

(cherry picked from commit eb2e3a2)
  • Loading branch information
RobertIndie committed Aug 31, 2023
1 parent d422463 commit 3b085ea
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,47 @@ public void testMaxPendingChunkMessages() throws Exception {
assertNull(consumer.receive(5, TimeUnit.SECONDS));
}

@Test
public void testResendChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/testResendChunkMessages";

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-subscriber-name")
.maxPendingChunkedMessage(10)
.autoAckOldestChunkedMessageOnQueueFull(true)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.chunkMaxMessageSize(100)
.enableChunking(true)
.enableBatching(false)
.create();

sendSingleChunk(producer, "0", 0, 2);

sendSingleChunk(producer, "0", 0, 2); // Resending the first chunk
sendSingleChunk(producer, "1", 0, 3); // This is for testing the interwoven chunked message
sendSingleChunk(producer, "1", 1, 3);
sendSingleChunk(producer, "1", 0, 3); // Resending the UUID-1 chunked message

sendSingleChunk(producer, "0", 1, 2);

Message<String> receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(receivedMsg.getValue(), "chunk-0-0|chunk-0-1|");
consumer.acknowledge(receivedMsg);

sendSingleChunk(producer, "1", 1, 3);
sendSingleChunk(producer, "1", 2, 3);

receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|");
consumer.acknowledge(receivedMsg);
}

/**
* Validate that chunking is not supported with batching and non-persistent topic
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,17 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m

ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());

if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) {
if (msgMetadata.getChunkId() == 0) {
if (chunkedMsgCtx != null) {
// The first chunk of a new chunked-message received before receiving other chunks of previous
// chunked-message
// so, remove previous chunked-message from map and release buffer
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
}
chunkedMsgCtx.recycle();
chunkedMessagesMap.remove(msgMetadata.getUuid());
}
pendingChunkedMessageCount++;
if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > maxPendingChunkedMessage) {
removeOldestPendingChunkedMessage();
Expand Down

0 comments on commit 3b085ea

Please sign in to comment.