Skip to content

Commit

Permalink
[improve][client] PIP-229: Add a common interface to get fields of th…
Browse files Browse the repository at this point in the history
…e MessageIdData

Master issue: apache#18950

### Motivation

We need a common interface to get fields of the MessageIdData. After
that, we won't need to assert a MessageId implementation is an instance
of a specific class. And we can pass our customized MessageId
implementation to APIs like `acknowledge` and `seek`.

### Modifications

- Add `MessageIdAdv` to get fields of `MessageIdData`, make all
  MessageId implementations inherit it (except `MultiMessageIdImpl`).
- Deprecate `BatchMessageAcker` by adding the ACK bit set field and the
  `PreviousMessageAcknowledger` interface to `BatchMessageIdImpl`.
- Deprecate `TopicMessageIdImpl#getInnerMessageId` by passing the
  `TopicMessageIdImpl` directly.
- Remove `instanceof BatchMessageIdImpl` checks in `pulsar-client`
  module by casting to `MessageIdAdv`.
  • Loading branch information
BewareMyPower committed Jan 29, 2023
1 parent c17f355 commit c1660b4
Show file tree
Hide file tree
Showing 31 changed files with 518 additions and 613 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -337,7 +336,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer1.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
receivedPtns.add(msgId.getPartitionIndex());
}

Expand All @@ -354,7 +353,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception {
}
totalMessages++;
consumer2.acknowledge(msg);
MessageIdImpl msgId = (MessageIdImpl) (((TopicMessageImpl)msg).getInnerMessageId());
MessageIdAdv msgId = (MessageIdAdv) msg.getMessageId();
receivedPtns.add(msgId.getPartitionIndex());
}
assertTrue(Sets.difference(listener1.inactivePtns, receivedPtns).isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -679,8 +678,7 @@ public void testSeekByFunction() throws Exception {
if (message == null) {
break;
}
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) message.getMessageId();
received.add(topicMessageId.getInnerMessageId());
received.add(message.getMessageId());
}
int msgNumFromPartition1 = list.size() / 2;
int msgNumFromPartition2 = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -768,7 +767,7 @@ public void testMessageIdForSubscribeToSinglePartition() throws Exception {

for (int i = 0; i < totalMessages; i ++) {
msg = consumer1.receive(5, TimeUnit.SECONDS);
Assert.assertEquals(((MessageIdImpl)((TopicMessageIdImpl)msg.getMessageId()).getInnerMessageId()).getPartitionIndex(), 2);
Assert.assertEquals(((MessageIdAdv) msg.getMessageId()).getPartitionIndex(), 2);
consumer1.acknowledge(msg);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ private AckTestData prepareDataForAck(String topic) throws PulsarClientException
messageIds.add(message.getMessageId());
}
MessageId firstEntryMessageId = messageIds.get(0);
MessageId secondEntryMessageId = ((BatchMessageIdImpl) messageIds.get(1)).toMessageIdImpl();
MessageId secondEntryMessageId = MessageIdAdvUtils.discardBatch(messageIds.get(1));
// Verify messages 2 to N must be in the same entry
for (int i = 2; i < messageIds.size(); i++) {
assertEquals(((BatchMessageIdImpl) messageIds.get(i)).toMessageIdImpl(), secondEntryMessageId);
assertEquals(MessageIdAdvUtils.discardBatch(messageIds.get(i)), secondEntryMessageId);
}

assertTrue(interceptor.individualAckedMessageIdList.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ public void producerSendAsync(TopicType topicType) throws PulsarClientException,
Message<byte[]> message = consumer.receive();
assertEquals(new String(message.getData()), messagePrefix + i);
MessageId messageId = message.getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive message");
}
log.info("Remaining message IDs = {}", messageIds);
Expand Down Expand Up @@ -166,9 +163,6 @@ public void producerSend(TopicType topicType) throws PulsarClientException, Puls

for (int i = 0; i < numberOfMessages; i++) {
MessageId messageId = consumer.receive().getMessageId();
if (topicType == TopicType.PARTITIONED) {
messageId = ((TopicMessageIdImpl) messageId).getInnerMessageId();
}
assertTrue(messageIds.remove(messageId), "Failed to receive Message");
}
log.info("Remaining message IDs = {}", messageIds);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import java.util.BitSet;

/**
* The {@link MessageId} interface provided for advanced users.
* <p>
* All built-in MessageId implementations should be able to be cast to MessageIdAdv.
* </p>
*/
public interface MessageIdAdv extends MessageId {

/**
* Get the ledger ID.
*
* @return the ledger ID
*/
long getLedgerId();

/**
* Get the entry ID.
*
* @return the entry ID
*/
long getEntryId();

/**
* Get the partition index.
*
* @return -1 if the message is from a non-partitioned topic, otherwise the non-negative partition index
*/
default int getPartitionIndex() {
return -1;
}

/**
* Get the batch index.
*
* @return -1 if the message is not in a batch
*/
default int getBatchIndex() {
return -1;
}

/**
* Get the batch size.
*
* @return 0 if the message is not in a batch
*/
default int getBatchSize() {
return 0;
}

/**
* Get the BitSet that indicates which messages in the batch.
*
* @implNote The message IDs of a batch should share a BitSet. For example, given 3 messages in the same batch whose
* size is 3, all message IDs of them should return "111" (i.e. a BitSet whose size is 3 and all bits are 1). If the
* 1st message has been acknowledged, the returned BitSet should become "011" (i.e. the 1st bit become 0).
*
* @return null if the message is a non-batched message
*/
default BitSet getAckSet() {
return null;
}

/**
* Get the message ID of the first chunk if the current message ID represents the position of a chunked message.
*
* @implNote A chunked message is distributed across different BookKeeper entries. The message ID of a chunked
* message is composed of two message IDs that represent positions of the first and the last chunk. The message ID
* itself represents the position of the last chunk.
*
* @return null if the message is not a chunked message
*/
default MessageIdAdv getFirstChunkMessageId() {
return null;
}

/**
* The default implementation of {@link Comparable#compareTo(Object)}.
*/
default int compareTo(MessageId o) {
if (!(o instanceof MessageIdAdv)) {
throw new UnsupportedOperationException("Unknown MessageId type: "
+ ((o != null) ? o.getClass().getName() : "null"));
}
final MessageIdAdv other = (MessageIdAdv) o;
int result = Long.compare(this.getLedgerId(), other.getLedgerId());
if (result != 0) {
return result;
}
result = Long.compare(this.getEntryId(), other.getEntryId());
if (result != 0) {
return result;
}
// TODO: Correct the following compare logics, see https://github.com/apache/pulsar/pull/18981
result = Integer.compare(this.getPartitionIndex(), other.getPartitionIndex());
if (result != 0) {
return result;
}
return Integer.compare(this.getBatchIndex(), other.getBatchIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface AcknowledgmentsGroupingTracker extends AutoCloseable {

boolean isDuplicate(MessageId messageId);

CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties);
CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType ackType, Map<String, Long> properties);

CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, AckType ackType,
Map<String, Long> properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.BitSet;

@Deprecated
public class BatchMessageAcker {

private BatchMessageAcker() {
Expand Down

This file was deleted.

Loading

0 comments on commit c1660b4

Please sign in to comment.