Skip to content

Commit

Permalink
[improve][client] Pulsar client supports multi-topic messageId deseri…
Browse files Browse the repository at this point in the history
…alization to ack messages
  • Loading branch information
rdhabalia committed Apr 11, 2023
1 parent 1545396 commit 04f52fa
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Lists;
import com.google.common.collect.Range;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -42,6 +45,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.Cleanup;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
Expand Down Expand Up @@ -351,4 +357,66 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
}
consumer.close();
}

/**
* It tests acking of messageId created from byte[] and validates client acks messages successfully.
* @throws Exception
*/
@Test
public void testMultiTopicAckWithByteMessageId() throws Exception {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

Producer<Long>[] producers = new Producer[numPartitions];

for (int i = 0; i < numPartitions; i++) {
producers[i] = pulsarClient.newProducer(Schema.INT64)
// produce to each partition directly so that order can be maintained in sending
.topic(topicName + "-partition-" + i).enableBatching(true).maxPendingMessages(30000)
.maxPendingMessagesAcrossPartitions(60000).batchingMaxMessages(10000)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS).batchingMaxBytes(4 * 1024 * 1024)
.blockIfQueueFull(true).create();
}

@Cleanup
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
// consume on the partitioned topic
.topic(topicName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(numMessages).subscriptionName(methodName).subscribe();

// produce sequence numbers to each partition topic
long sequenceNumber = 1L;
for (int i = 0; i < numMessages; i++) {
for (Producer<Long> producer : producers) {
producer.newMessage().value(sequenceNumber).sendAsync();
}
sequenceNumber++;
}
for (Producer<Long> producer : producers) {
producer.flush();
producer.close();
}

// receive and validate sequences in the partitioned topic
Map<String, AtomicLong> receivedSequences = new HashMap<>();
int receivedCount = 0;
while (receivedCount < numPartitions * numMessages) {
Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
byte[] idByte = message.getMessageId().toByteArray();
MessageId id = MessageId.fromByteArray(idByte);
consumer.acknowledge(id);
receivedCount++;
AtomicLong receivedSequenceCounter = receivedSequences.computeIfAbsent(message.getTopicName(),
k -> new AtomicLong(1L));
Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
consumer.close();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName + "-partition-0", false).get().get();
Range<PositionImpl> range = topic.getManagedLedger().getCursors().iterator().next().getLastIndividualDeletedRange();
assertNull(range);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public Impl(String topic, MessageId messageId) {
this.messageId = (MessageIdAdv) messageId;
}

protected MessageId getMessageId() {
return messageId;
}

@Override
public byte[] toByteArray() {
return messageId.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public byte[] toByteArray() {
return toByteArray(batchIndex, batchSize);
}

@Override
protected byte[] toByteArray(String topic) {
return toByteArray(batchIndex, batchSize, topic);
}

@Deprecated
public boolean ackIndividual() {
return MessageIdAdvUtils.acknowledge(this, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.common.api.proto.MessageIdData;
Expand Down Expand Up @@ -93,7 +94,7 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
throw new IOException(e);
}

MessageIdImpl messageId;
MessageId messageId;
if (idData.hasBatchIndex()) {
if (idData.hasBatchSize()) {
messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(),
Expand All @@ -112,6 +113,11 @@ public static MessageId fromByteArray(byte[] data) throws IOException {
messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition());
}

if (idData.hasTopicName()) {
String topicName = idData.getTopicName();
messageId = new TopicMessageIdImpl(topicName, topicName, messageId);
}

return messageId;
}

Expand Down Expand Up @@ -174,7 +180,18 @@ protected MessageIdData writeMessageIdData(MessageIdData msgId, int batchIndex,

// batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message
protected byte[] toByteArray(int batchIndex, int batchSize) {
return toByteArray(batchIndex, batchSize, null);
}

protected byte[] toByteArray(String topic) {
return toByteArray(-1, 0, topic);
}

protected byte[] toByteArray(int batchIndex, int batchSize, String topicName) {
MessageIdData msgId = writeMessageIdData(null, batchIndex, batchSize);
if (StringUtils.isNotBlank(topicName)) {
msgId.setTopicName(topicName);
}

int size = msgId.getSerializedSize();
ByteBuf serialized = Unpooled.buffer(size, size);
Expand All @@ -186,6 +203,6 @@ protected byte[] toByteArray(int batchIndex, int batchSize) {
@Override
public byte[] toByteArray() {
// there is no message batch so we pass -1
return toByteArray(-1, 0);
return toByteArray(-1, 0, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,14 @@ public boolean equals(Object obj) {
public int hashCode() {
return super.hashCode();
}
}

@Override
public byte[] toByteArray() {
MessageId id = getMessageId();
if (id instanceof MessageIdImpl) {
return ((MessageIdImpl) id).toByteArray(-1, 0, getOwnerTopic());
} else {
return id.toByteArray();
}
}
}
2 changes: 1 addition & 1 deletion pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ message MessageIdData {
optional int32 batch_index = 4 [default = -1];
repeated int64 ack_set = 5;
optional int32 batch_size = 6;

// For the chunk message id, we need to specify the first chunk message id.
optional MessageIdData first_chunk_message_id = 7;
optional string topicName = 8;
}

message KeyValue {
Expand Down

0 comments on commit 04f52fa

Please sign in to comment.