Skip to content

Commit

Permalink
[fix] [txn] Get previous position by managed ledger. (apache#22024)
Browse files Browse the repository at this point in the history
(cherry picked from commit 98ce27e)
  • Loading branch information
thetumbled authored and mukesh-ctds committed Mar 6, 2024
1 parent dfc75f5 commit 84eedda
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,8 @@ private void handleTransactionMessage(TxnID txnId, Position position) {
.checkAbortedTransaction(txnId)) {
ongoingTxns.put(txnId, (PositionImpl) position);
PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey());
//max read position is less than first ongoing transaction message position, so entryId -1
maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), firstPosition.getEntryId() - 1);
// max read position is less than first ongoing transaction message position
maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(firstPosition);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -304,6 +306,62 @@ public void testGetLastMessageIdsWithOngoingTransactions() throws Exception {
assertMessageId(consumer, expectedLastMessageID2, 2);
}

/**
* produce 3 messages and then trigger a ledger switch,
* then create a transaction and send a transactional message.
* As there are messages in the new ledger, the reader should be able to read the messages.
* But reader.hasMessageAvailable() returns false if the entry id of max read position is -1.
* @throws Exception
*/
@Test
public void testGetLastMessageIdsWithOpenTransactionAtLedgerHead() throws Exception {
String topic = "persistent://" + NAMESPACE1 + "/testGetLastMessageIdsWithOpenTransactionAtLedgerHead";
String subName = "my-subscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
MessageId expectedLastMessageID = null;
for (int i = 0; i < 3; i++) {
expectedLastMessageID = producer.newMessage().value(String.valueOf(i).getBytes()).send();
System.out.println("expectedLastMessageID: " + expectedLastMessageID);
}
triggerLedgerSwitch(topic);
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.HOURS)
.build()
.get();
producer.newMessage(txn).send();

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(MessageId.earliest)
.create();
assertTrue(reader.hasMessageAvailable());
}

private void triggerLedgerSwitch(String topicName) throws Exception{
admin.topics().unload(topicName);
Awaitility.await().until(() -> {
CompletableFuture<Optional<Topic>> topicFuture =
getPulsarServiceList().get(0).getBrokerService().getTopic(topicName, false);
if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){
return false;
}
Optional<Topic> topicOptional = topicFuture.join();
if (!topicOptional.isPresent()){
return false;
}
PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened;
});
}

private void assertMessageId(Consumer<?> consumer, MessageIdImpl expected, int entryOffset) throws Exception {
TopicMessageIdImpl actual = (TopicMessageIdImpl) consumer.getLastMessageIds().get(0);
assertEquals(expected.getEntryId(), actual.getEntryId() - entryOffset);
Expand Down

0 comments on commit 84eedda

Please sign in to comment.