diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 6965ac28068c1..0972c9098b55b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.broker.service; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; @@ -383,7 +382,7 @@ public static Set getBrokerEntryMetadataIntercep * * @throws Exception */ - @Test(groups = "flaky") + @Test void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers"; @@ -402,11 +401,15 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { for (int i = 0; i < totalEntries; i++) { ledger.addEntry(createMessageWrittenToLedger("msg" + i)); } + Awaitility.await().untilAsserted(() -> + assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened)); List ledgers = ledger.getLedgersInfoAsList(); LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1); - - assertEquals(ledgers.size(), totalEntries / entriesPerLedger); + // The `lastLedgerInfo` should be newly opened, and it does not contain any entries. + // Please refer to: https://github.com/apache/pulsar/pull/22034 + assertEquals(lastLedgerInfo.getEntries(), 0); + assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1); // this will make sure that all entries should be deleted Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds)); @@ -420,19 +423,13 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); - Position previousMarkDelete = null; - for (int i = 0; i < totalEntries; i++) { - monitor.expireMessages(1); - Position previousPos = previousMarkDelete; - retryStrategically( - (test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos), - 5, 100); - previousMarkDelete = c1.getMarkDeletedPosition(); - } - - PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); - assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId()); - assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId()); + assertTrue(monitor.expireMessages(ttlSeconds)); + Awaitility.await().untilAsserted(() -> { + PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); + // The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo. + assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1); + assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1); + }); c1.close(); ledger.close(); @@ -440,20 +437,25 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { } - @Test(groups = "flaky") + @Test public void testIncorrectClientClock() throws Exception { final String ledgerAndCursorName = "testIncorrectClientClock"; int maxTTLSeconds = 1; + int entriesNum = 10; ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setMaxEntriesPerLedger(1); ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); // set client clock to 10 days later long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < entriesNum; i++) { ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); } - assertEquals(ledger.getLedgersInfoAsList().size(), 10); + Awaitility.await().untilAsserted(() -> + assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened)); + // The number of ledgers should be (entriesNum / MaxEntriesPerLedger) + 1 + // Please refer to: https://github.com/apache/pulsar/pull/22034 + assertEquals(ledger.getLedgersInfoAsList().size(), entriesNum + 1); PersistentTopic mock = mock(PersistentTopic.class); when(mock.getName()).thenReturn("topicname"); when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);