Skip to content

Commit

Permalink
[fix][client] Fix consumer can't consume resent chunked messages
Browse files Browse the repository at this point in the history
  • Loading branch information
RobertIndie committed Aug 26, 2023
1 parent 69298da commit 44a055b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,47 @@ public void testMaxPendingChunkMessages() throws Exception {
assertNull(consumer.receive(5, TimeUnit.SECONDS));
}

@Test
public void testResendChunkMessages() throws Exception {
log.info("-- Starting {} test --", methodName);
final String topicName = "persistent://my-property/my-ns/testResendChunkMessages";

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("my-subscriber-name")
.maxPendingChunkedMessage(10)
.autoAckOldestChunkedMessageOnQueueFull(true)
.subscribe();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.chunkMaxMessageSize(100)
.enableChunking(true)
.enableBatching(false)
.create();

sendSingleChunk(producer, "0", 0, 2);

sendSingleChunk(producer, "0", 0, 2); // Resending the first chunk
sendSingleChunk(producer, "1", 0, 3); // This is for testing the interwoven chunked message
sendSingleChunk(producer, "1", 1, 3);
sendSingleChunk(producer, "1", 0, 3); // Resending the UUID-1 chunked message

sendSingleChunk(producer, "0", 1, 2);

Message<String> receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(receivedMsg.getValue(), "chunk-0-0|chunk-0-1|");
consumer.acknowledge(receivedMsg);

sendSingleChunk(producer, "1", 1, 3);
sendSingleChunk(producer, "1", 2, 3);

receivedMsg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(receivedMsg.getValue(), "chunk-1-0|chunk-1-1|chunk-1-2|");
consumer.acknowledge(receivedMsg);
}

/**
* Validate that chunking is not supported with batching and non-persistent topic
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1433,7 +1433,17 @@ private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata m

ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());

if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) {
if (msgMetadata.getChunkId() == 0) {
if (chunkedMsgCtx != null) {
// The first chunk of a new chunked-message received before receiving other chunks of previous
// chunked-message
// so, remove previous chunked-message from map and release buffer
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
}
chunkedMsgCtx.recycle();
chunkedMessagesMap.remove(msgMetadata.getUuid());
}
pendingChunkedMessageCount++;
if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > maxPendingChunkedMessage) {
removeOldestPendingChunkedMessage();
Expand Down

0 comments on commit 44a055b

Please sign in to comment.