Skip to content

Commit

Permalink
[improve][broker] Optimize the performance of individual acknowledgme…
Browse files Browse the repository at this point in the history
…nts (apache#23072)

(cherry picked from commit 77b6378)
(cherry picked from commit 487913e)
  • Loading branch information
shibd authored and srinath-ctds committed Jul 30, 2024
1 parent 148077c commit f5eebae
Showing 1 changed file with 69 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -506,14 +507,16 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {

//this method is for individual ack not carry the transaction
private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) {
List<Position> positionsAcked = new ArrayList<>();
List<Pair<Consumer, Position>> positionsAcked = new ArrayList<>();
long totalAckCount = 0;
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
long ackedCount = 0;
long batchSize = getBatchSize(msgId);
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
Pair<Consumer, Long> ackOwnerConsumerAndBatchSize =
getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(), msgId.getEntryId());
Consumer ackOwnerConsumer = ackOwnerConsumerAndBatchSize.getLeft();
long ackedCount;
long batchSize = ackOwnerConsumerAndBatchSize.getRight();
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
Expand All @@ -532,28 +535,32 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
if (checkCanRemovePendingAcksAndHandle(position, msgId)) {
if (checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId)) {
addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
}
}

positionsAcked.add(position);
positionsAcked.add(Pair.of(ackOwnerConsumer, position));

checkAckValidationError(ack, position);

totalAckCount += ackedCount;
}
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
subscription.acknowledgeMessage(positionsAcked.stream()
.map(Pair::getRight)
.collect(Collectors.toList()), AckType.Individual, properties);
CompletableFuture<Long> completableFuture = new CompletableFuture<>();
completableFuture.complete(totalAckCount);
if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> {
completableFuture.whenComplete((v, e) -> positionsAcked.forEach(positionPair -> {
Consumer ackOwnerConsumer = positionPair.getLeft();
Position position = positionPair.getRight();
//check if the position can remove from the consumer pending acks.
// the bit set is empty in pending ack handle.
if (((PositionImpl) position).getAckSet() != null) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) {
removePendingAcks((PositionImpl) position);
removePendingAcks(ackOwnerConsumer, (PositionImpl) position);
}
}
}));
Expand All @@ -565,7 +572,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
//this method is for individual ack carry the transaction
private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
// Individual ack
List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>();
List<Pair<Consumer, MutablePair<PositionImpl, Integer>>> positionsAcked = new ArrayList<>();
if (!isTransactionEnabled()) {
return FutureUtil.failedFuture(
new BrokerServiceException.NotAllowedException("Server don't support transaction ack!"));
Expand All @@ -575,20 +582,23 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
Consumer ackOwnerConsumer = getAckOwnerConsumerAndBatchSize(msgId.getLedgerId(),
msgId.getEntryId()).getLeft();
// acked count at least one
long ackedCount = 0;
long batchSize = 0;
long ackedCount;
long batchSize;
if (msgId.hasBatchSize()) {
batchSize = msgId.getBatchSize();
// ack batch messages set ackeCount = batchSize
ackedCount = msgId.getBatchSize();
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, msgId.getBatchSize())));
} else {
// ack no batch message set ackedCount = 1
batchSize = 0;
ackedCount = 1;
positionsAcked.add(new MutablePair<>(position, (int) batchSize));
positionsAcked.add(Pair.of(ackOwnerConsumer, new MutablePair<>(position, (int) batchSize)));
}
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());

if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
Expand All @@ -600,47 +610,31 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);

checkCanRemovePendingAcksAndHandle(position, msgId);
checkCanRemovePendingAcksAndHandle(ackOwnerConsumer, position, msgId);

checkAckValidationError(ack, position);

totalAckCount.add(ackedCount);
}

CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(),
ack.getTxnidLeastBits(), positionsAcked);
ack.getTxnidLeastBits(), positionsAcked.stream().map(Pair::getRight).collect(Collectors.toList()));
if (Subscription.isIndividualAckMode(subType)) {
completableFuture.whenComplete((v, e) ->
positionsAcked.forEach(positionLongMutablePair -> {
positionsAcked.forEach(positionPair -> {
Consumer ackOwnerConsumer = positionPair.getLeft();
MutablePair<PositionImpl, Integer> positionLongMutablePair = positionPair.getRight();
if (positionLongMutablePair.getLeft().getAckSet() != null) {
if (((PersistentSubscription) subscription)
.checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) {
removePendingAcks(positionLongMutablePair.left);
removePendingAcks(ackOwnerConsumer, positionLongMutablePair.left);
}
}
}));
}
return completableFuture.thenApply(__ -> totalAckCount.sum());
}

private long getBatchSize(MessageIdData msgId) {
long batchSize = 1;
if (Subscription.isIndividualAckMode(subType)) {
LongPair longPair = pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId());
// Consumer may ack the msg that not belongs to it.
if (longPair == null) {
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
longPair = ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId());
if (longPair != null) {
batchSize = longPair.first;
}
} else {
batchSize = longPair.first;
}
}
return batchSize;
}

private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, Consumer consumer) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
Expand Down Expand Up @@ -700,26 +694,39 @@ private void checkAckValidationError(CommandAck ack, PositionImpl position) {
}
}

private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) {
private boolean checkCanRemovePendingAcksAndHandle(Consumer ackOwnedConsumer,
PositionImpl position, MessageIdData msgId) {
if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) {
return removePendingAcks(position);
return removePendingAcks(ackOwnedConsumer, position);
}
return false;
}

private Consumer getAckOwnerConsumer(long ledgerId, long entryId) {
Consumer ackOwnerConsumer = this;
/**
* Retrieves the acknowledgment owner consumer and batch size for the specified ledgerId and entryId.
*
* @param ledgerId The ID of the ledger.
* @param entryId The ID of the entry.
* @return Pair<Consumer, BatchSize>
*/
private Pair<Consumer, Long> getAckOwnerConsumerAndBatchSize(long ledgerId, long entryId) {
if (Subscription.isIndividualAckMode(subType)) {
if (!getPendingAcks().containsKey(ledgerId, entryId)) {
LongPair longPair = getPendingAcks().get(ledgerId, entryId);
if (longPair != null) {
return Pair.of(this, longPair.first);
} else {
// If there are more consumers, this step will consume more CPU, and it should be optimized later.
for (Consumer consumer : subscription.getConsumers()) {
if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) {
ackOwnerConsumer = consumer;
break;
if (consumer != this) {
longPair = consumer.getPendingAcks().get(ledgerId, entryId);
if (longPair != null) {
return Pair.of(consumer, longPair.first);
}
}
}
}
}
return ackOwnerConsumer;
return Pair.of(this, 1L);
}

private long[] getCursorAckSet(PositionImpl position) {
Expand Down Expand Up @@ -971,44 +978,24 @@ public int hashCode() {
*
* @param position
*/
private boolean removePendingAcks(PositionImpl position) {
Consumer ackOwnedConsumer = null;
if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) {
for (Consumer consumer : subscription.getConsumers()) {
if (!consumer.equals(this) && consumer.getPendingAcks().containsKey(position.getLedgerId(),
position.getEntryId())) {
ackOwnedConsumer = consumer;
break;
}
}
} else {
ackOwnedConsumer = this;
private boolean removePendingAcks(Consumer ackOwnedConsumer, PositionImpl position) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
}

// remove pending message from appropriate consumer and unblock unAckMsg-flow if requires
LongPair ackedPosition = ackOwnedConsumer != null
? ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId())
: null;
if (ackedPosition != null) {
if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) {
// Message was already removed by the other consumer
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position);
}
return false;
// unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages =>
// consumer can start again consuming messages
int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer);
if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs)
&& ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs())
|| !shouldBlockConsumerOnUnackMsgs()) {
ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false;
flowConsumerBlockedPermits(ackOwnedConsumer);
}
return true;
}

public ConcurrentLongLongPairHashMap getPendingAcks() {
Expand Down

0 comments on commit f5eebae

Please sign in to comment.