Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix get outdated compactedTopicContext after compaction…
Browse files Browse the repository at this point in the history
…Horizon has been updated (apache#20984)
  • Loading branch information
coderzc authored Aug 16, 2023
1 parent bfde0de commit 0cb1c78
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Note: If you want to guarantee that strong consistency between `compactionHorizon` and `compactedTopicContext`,
* you need to call getting them method in "synchronized(CompactedTopicImpl){ ... }" lock block.
*/
public class CompactedTopicImpl implements CompactedTopic {
static final long NEWER_THAN_COMPACTED = -0xfeed0fbaL;
static final long COMPACT_LEDGER_EMPTY = -0xfeed0fbbL;
Expand All @@ -70,14 +74,14 @@ public CompactedTopicImpl(BookKeeper bk) {
@Override
public CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId) {
synchronized (this) {
compactionHorizon = (PositionImpl) p;

CompletableFuture<CompactedTopicContext> previousContext = compactedTopicContext;
compactedTopicContext = openCompactedLedger(bk, compactedLedgerId);

compactionHorizon = (PositionImpl) p;

// delete the ledger from the old context once the new one is open
return compactedTopicContext.thenCompose(__ ->
previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
return compactedTopicContext.thenCompose(
__ -> previousContext != null ? previousContext : CompletableFuture.completedFuture(null));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -62,6 +63,7 @@
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -840,4 +842,68 @@ public void testReadCompactedLatestMessageWithInclusive() throws Exception {
Assert.assertTrue(reader.hasMessageAvailable());
Assert.assertEquals(reader.readNext().getMessageId(), lastMessage.get());
}

@Test
public void testCompactWithConcurrentGetCompactionHorizonAndCompactedTopicContext() throws Exception {
BookKeeper bk0 = pulsar.getBookKeeperClientFactory().create(
this.conf, null, null, Optional.empty(), null);

final BookKeeper bk = Mockito.spy(bk0);

Mockito.doAnswer(invocation -> {
Thread.sleep(1500);
invocation.callRealMethod();
return null;
}).when(bk).asyncOpenLedger(Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());

LedgerHandle oldCompactedLedger = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
oldCompactedLedger.close();
LedgerHandle newCompactedLedger = bk.createLedger(1, 1,
Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE,
Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
newCompactedLedger.close();

CompactedTopicImpl compactedTopic = new CompactedTopicImpl(bk);

PositionImpl oldHorizon = new PositionImpl(1, 2);
var future = CompletableFuture.supplyAsync(() -> {
// set the compacted topic ledger
return compactedTopic.newCompactedLedger(oldHorizon, oldCompactedLedger.getId());
});
Thread.sleep(500);

Optional<Position> compactionHorizon = compactedTopic.getCompactionHorizon();
CompletableFuture<CompactedTopicContext> compactedTopicContext =
compactedTopic.getCompactedTopicContextFuture();

if (compactedTopicContext != null) {
Assert.assertEquals(compactionHorizon.get(), oldHorizon);
Assert.assertNotNull(compactedTopicContext);
Assert.assertEquals(compactedTopicContext.join().ledger.getId(), oldCompactedLedger.getId());
} else {
Assert.assertTrue(compactionHorizon.isEmpty());
}

future.join();

PositionImpl newHorizon = new PositionImpl(1, 3);
var future2 = CompletableFuture.supplyAsync(() -> {
// update the compacted topic ledger
return compactedTopic.newCompactedLedger(newHorizon, newCompactedLedger.getId());
});
Thread.sleep(500);

compactionHorizon = compactedTopic.getCompactionHorizon();
compactedTopicContext = compactedTopic.getCompactedTopicContextFuture();

if (compactedTopicContext.join().ledger.getId() == newCompactedLedger.getId()) {
Assert.assertEquals(compactionHorizon.get(), newHorizon);
} else {
Assert.assertEquals(compactionHorizon.get(), oldHorizon);
}

future2.join();
}
}

0 comments on commit 0cb1c78

Please sign in to comment.