Skip to content

Commit

Permalink
[fix][broker]Delete compacted ledger when topic is deleted (#21745) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc authored Jan 10, 2024
1 parent ab7b3ef commit fe5f3b8
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,5 +109,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}
}

CompletableFuture<Void> cleanCompactedLedger() {
final CompletableFuture<CompactedTopicContext> compactedTopicContextFuture =
((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture();
if (compactedTopicContextFuture != null) {
return compactedTopicContextFuture.thenCompose(context -> {
long compactedLedgerId = context.getLedger().getId();
((CompactedTopicImpl) compactedTopic).reset();
return compactedTopic.deleteCompactedLedger(compactedLedgerId);
});
} else {
return CompletableFuture.completedFuture(null);
}
}

private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public static boolean isDedupCursorName(String name) {
protected final MessageDeduplication messageDeduplication;

private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
COMPACTION_NEVER_RUN);
private final CompactedTopic compactedTopic;

private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
Expand Down Expand Up @@ -1031,13 +1032,13 @@ public CompletableFuture<Void> unsubscribe(String subscriptionName) {
new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
}

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof MetadataNotFoundException) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
return;
}

Expand All @@ -1047,12 +1048,41 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
}, null);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
}

return unsubscribeFuture;
}

private void asyncDeleteCursorWithCleanCompactionLedger(String subscriptionName,
CompletableFuture<Void> unsubscribeFuture) {
PersistentSubscription subscription = subscriptions.get(subscriptionName);
if (subscription == null) {
log.warn("[{}][{}] Can't find subscription, skip delete cursor", topic, subscriptionName);
unsubscribeFuture.complete(null);
return;
}

if ((!isCompactionSubscription(subscriptionName)) || !(subscription instanceof CompactorSubscription)) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
return;
}

currentCompaction.handle((__, e) -> {
if (e != null) {
log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName);
}
return ((CompactorSubscription) subscription).cleanCompactedLedger();
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}][{}] Error cleaning compacted ledger", topic, subscriptionName, ex);
unsubscribeFuture.completeExceptionally(ex);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
}
});
}

private void asyncDeleteCursor(String subscriptionName, CompletableFuture<Void> unsubscribeFuture) {
ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new DeleteCursorCallback() {
@Override
Expand Down Expand Up @@ -2844,11 +2874,24 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
if (!lock.readLock().tryLock()) {
log.info("[{}] Conflict topic-close, topic-delete, skip triggering compaction", topic);
return;
}
try {
if (isClosingOrDeleting) {
log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic);
return;
}

currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
} finally {
lock.readLock().unlock();
}
currentCompaction.whenComplete((ignore, ex) -> {
if (ex != null){
log.warn("[{}] Compaction failure.", topic, ex);
}
if (ex != null) {
log.warn("[{}] Compaction failure.", topic, ex);
}
});
} else {
throw new AlreadyRunningException("Compaction already in progress");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);

consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);
Expand Down Expand Up @@ -122,7 +123,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
true
false
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
Expand Down Expand Up @@ -330,6 +331,16 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m)
public synchronized Optional<Position> getCompactionHorizon() {
return Optional.ofNullable(this.compactionHorizon);
}

public void reset() {
this.compactionHorizon = null;
this.compactedTopicContext = null;
}

@Nullable
public CompletableFuture<CompactedTopicContext> getCompactedTopicContextFuture() {
return compactedTopicContext;
}
private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
}

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -47,15 +48,19 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand All @@ -64,6 +69,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
Expand All @@ -84,6 +90,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -1969,4 +1976,128 @@ public void testCompactionDuplicate() throws Exception {
}
}
}

@Test
public void testDeleteCompactedLedger() throws Exception {
String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedger";

final String subName = "my-sub";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();

for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();

Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topicName).get();

MutableLong compactedLedgerId = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
Assert.assertEquals(stats.compactedLedger.entries, 2L);
});

// delete compacted ledger
admin.topics().deleteSubscription(topicName, "__compaction");

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
Assert.assertEquals(stats.compactedLedger.entries, -1L);
assertThrows(BKException.BKNoSuchLedgerExistsException.class, () -> pulsar.getBookKeeperClient()
.openLedger(compactedLedgerId.getValue(), BookKeeper.DigestType.CRC32C, new byte[]{}));
});

compactor.compact(topicName).get();

MutableLong compactedLedgerId2 = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
Assert.assertEquals(stats.compactedLedger.entries, 2L);
});

producer.close();
admin.topics().delete(topicName);

Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
() -> pulsar.getBookKeeperClient().openLedger(
compactedLedgerId2.getValue(), BookKeeper.DigestType.CRC32, new byte[]{})));
}

@Test
public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
// Disable topic level policies, since block ack thread may also block thread of delete topic policies.
conf.setTopicLevelPoliciesEnabled(false);
restartBroker();

String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
.close();

for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subscription = spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, subscription);

AtomicLong compactedLedgerId = new AtomicLong(-1);
AtomicBoolean pauseAck = new AtomicBoolean();
Mockito.doAnswer(invocationOnMock -> {
Map<String, Long> properties = (Map<String, Long>) invocationOnMock.getArguments()[2];
log.info("acknowledgeMessage properties: {}", properties);
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
pauseAck.set(true);
while (pauseAck.get()) {
Thread.sleep(200);
}
return invocationOnMock.callRealMethod();
}).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
CommandAck.AckType.Cumulative), Mockito.any());

admin.topics().triggerCompaction(topicName);

while (!pauseAck.get()) {
Thread.sleep(100);
}

CompletableFuture<Long> currentCompaction =
(CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, "currentCompaction", true);
CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
FieldUtils.writeDeclaredField(topic, "currentCompaction", spyCurrentCompaction, true);
currentCompaction.whenComplete((obj, throwable) -> {
if (throwable != null) {
spyCurrentCompaction.completeExceptionally(throwable);
} else {
spyCurrentCompaction.complete(obj);
}
});
Mockito.doAnswer(invocationOnMock -> {
pauseAck.set(false);
return invocationOnMock.callRealMethod();
}).when(spyCurrentCompaction).handle(Mockito.any());

admin.topics().delete(topicName, true);

Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
() -> pulsar.getBookKeeperClient().openLedger(
compactedLedgerId.get(), BookKeeper.DigestType.CRC32, new byte[]{})));
}
}

0 comments on commit fe5f3b8

Please sign in to comment.