Skip to content

Commit

Permalink
Improve test
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Dec 14, 2023
1 parent b6b3ea7 commit bbef1e6
Showing 1 changed file with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1871,14 +1871,41 @@ public void testReadCommittedWithCompaction() throws Exception{
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).key("K2").value("V2").send();
producer.newMessage(txn).key("K3").value("V3").send();
txn.commit();
txn.commit().get();

producer.newMessage().key("K1").value("V4").send();

Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn2).key("K2").value("V5").send();
producer.newMessage(txn2).key("K3").value("V6").send();
txn2.commit().get();

admin.topics().triggerCompaction(topic);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.SUCCESS);
});

@Cleanup
Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Exclusive)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
while (true) {
Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}

result.add(receive.getValue());
}

Assert.assertEquals(result, List.of("V4", "V5", "V6"));
}

}

0 comments on commit bbef1e6

Please sign in to comment.