Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Delete compacted ledger when topic is deleted (#21745) #21850

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -106,5 +109,19 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}
}

CompletableFuture<Void> cleanCompactedLedger() {
final CompletableFuture<CompactedTopicContext> compactedTopicContextFuture =
((CompactedTopicImpl) compactedTopic).getCompactedTopicContextFuture();
if (compactedTopicContextFuture != null) {
return compactedTopicContextFuture.thenCompose(context -> {
long compactedLedgerId = context.getLedger().getId();
((CompactedTopicImpl) compactedTopic).reset();
return compactedTopic.deleteCompactedLedger(compactedLedgerId);
});
} else {
return CompletableFuture.completedFuture(null);
}
}

private static final Logger log = LoggerFactory.getLogger(CompactorSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ public static boolean isDedupCursorName(String name) {
protected final MessageDeduplication messageDeduplication;

private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(
COMPACTION_NEVER_RUN);
private final CompactedTopic compactedTopic;

private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
Expand Down Expand Up @@ -1031,13 +1032,13 @@ public CompletableFuture<Void> unsubscribe(String subscriptionName) {
new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
}

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof MetadataNotFoundException) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
return;
}

Expand All @@ -1047,12 +1048,41 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
}, null);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
asyncDeleteCursorWithCleanCompactionLedger(subscriptionName, unsubscribeFuture);
}

return unsubscribeFuture;
}

private void asyncDeleteCursorWithCleanCompactionLedger(String subscriptionName,
CompletableFuture<Void> unsubscribeFuture) {
PersistentSubscription subscription = subscriptions.get(subscriptionName);
if (subscription == null) {
log.warn("[{}][{}] Can't find subscription, skip delete cursor", topic, subscriptionName);
unsubscribeFuture.complete(null);
return;
}

if ((!isCompactionSubscription(subscriptionName)) || !(subscription instanceof CompactorSubscription)) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
return;
}

currentCompaction.handle((__, e) -> {
if (e != null) {
log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName);
}
return ((CompactorSubscription) subscription).cleanCompactedLedger();
}).whenComplete((__, ex) -> {
if (ex != null) {
log.error("[{}][{}] Error cleaning compacted ledger", topic, subscriptionName, ex);
unsubscribeFuture.completeExceptionally(ex);
} else {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
}
});
}

private void asyncDeleteCursor(String subscriptionName, CompletableFuture<Void> unsubscribeFuture) {
ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new DeleteCursorCallback() {
@Override
Expand Down Expand Up @@ -2824,11 +2854,24 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
if (!lock.readLock().tryLock()) {
log.info("[{}] Conflict topic-close, topic-delete, skip triggering compaction", topic);
return;
}
try {
if (isClosingOrDeleting) {
log.info("[{}] Topic is closing or deleting, skip triggering compaction", topic);
return;
}

currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
} finally {
lock.readLock().unlock();
}
currentCompaction.whenComplete((ignore, ex) -> {
if (ex != null){
log.warn("[{}] Compaction failure.", topic, ex);
}
if (ex != null) {
log.warn("[{}] Compaction failure.", topic, ex);
}
});
} else {
throw new AlreadyRunningException("Compaction already in progress");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumerConfiguration.setReadCompacted(true);
consumerConfiguration.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
consumerConfiguration.setAckReceiptEnabled(true);

consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);
Expand Down Expand Up @@ -122,7 +123,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
true
false
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
Expand Down Expand Up @@ -330,6 +331,16 @@ private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m)
public synchronized Optional<Position> getCompactionHorizon() {
return Optional.ofNullable(this.compactionHorizon);
}

public void reset() {
this.compactionHorizon = null;
this.compactedTopicContext = null;
}

@Nullable
public CompletableFuture<CompactedTopicContext> getCompactedTopicContextFuture() {
return compactedTopicContext;
}
private static final Logger log = LoggerFactory.getLogger(CompactedTopicImpl.class);
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand All @@ -47,15 +48,19 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand All @@ -64,6 +69,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
Expand All @@ -84,6 +90,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -1969,4 +1976,128 @@ public void testCompactionDuplicate() throws Exception {
}
}
}

@Test
public void testDeleteCompactedLedger() throws Exception {
String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedger";

final String subName = "my-sub";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();

for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();

Compactor compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler);
compactor.compact(topicName).get();

MutableLong compactedLedgerId = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId.setValue(stats.compactedLedger.ledgerId);
Assert.assertEquals(stats.compactedLedger.entries, 2L);
});

// delete compacted ledger
admin.topics().deleteSubscription(topicName, "__compaction");

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertEquals(stats.compactedLedger.ledgerId, -1L);
Assert.assertEquals(stats.compactedLedger.entries, -1L);
assertThrows(BKException.BKNoSuchLedgerExistsException.class, () -> pulsar.getBookKeeperClient()
.openLedger(compactedLedgerId.getValue(), BookKeeper.DigestType.CRC32C, new byte[]{}));
});

compactor.compact(topicName).get();

MutableLong compactedLedgerId2 = new MutableLong(-1);
Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1L);
compactedLedgerId2.setValue(stats.compactedLedger.ledgerId);
Assert.assertEquals(stats.compactedLedger.entries, 2L);
});

producer.close();
admin.topics().delete(topicName);

Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
() -> pulsar.getBookKeeperClient().openLedger(
compactedLedgerId2.getValue(), BookKeeper.DigestType.CRC32, new byte[]{})));
}

@Test
public void testDeleteCompactedLedgerWithSlowAck() throws Exception {
// Disable topic level policies, since block ack thread may also block thread of delete topic policies.
conf.setTopicLevelPoliciesEnabled(false);
restartBroker();

String topicName = "persistent://my-property/use/my-ns/testDeleteCompactedLedgerWithSlowAck";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

pulsarClient.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(Compactor.COMPACTION_SUBSCRIPTION)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).readCompacted(true).subscribe()
.close();

for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription subscription = spy(topic.getSubscription(Compactor.COMPACTION_SUBSCRIPTION));
topic.getSubscriptions().put(Compactor.COMPACTION_SUBSCRIPTION, subscription);

AtomicLong compactedLedgerId = new AtomicLong(-1);
AtomicBoolean pauseAck = new AtomicBoolean();
Mockito.doAnswer(invocationOnMock -> {
Map<String, Long> properties = (Map<String, Long>) invocationOnMock.getArguments()[2];
log.info("acknowledgeMessage properties: {}", properties);
compactedLedgerId.set(properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
pauseAck.set(true);
while (pauseAck.get()) {
Thread.sleep(200);
}
return invocationOnMock.callRealMethod();
}).when(subscription).acknowledgeMessage(Mockito.any(), Mockito.eq(
CommandAck.AckType.Cumulative), Mockito.any());

admin.topics().triggerCompaction(topicName);

while (!pauseAck.get()) {
Thread.sleep(100);
}

CompletableFuture<Long> currentCompaction =
(CompletableFuture<Long>) FieldUtils.readDeclaredField(topic, "currentCompaction", true);
CompletableFuture<Long> spyCurrentCompaction = spy(currentCompaction);
FieldUtils.writeDeclaredField(topic, "currentCompaction", spyCurrentCompaction, true);
currentCompaction.whenComplete((obj, throwable) -> {
if (throwable != null) {
spyCurrentCompaction.completeExceptionally(throwable);
} else {
spyCurrentCompaction.complete(obj);
}
});
Mockito.doAnswer(invocationOnMock -> {
pauseAck.set(false);
return invocationOnMock.callRealMethod();
}).when(spyCurrentCompaction).handle(Mockito.any());

admin.topics().delete(topicName, true);

Awaitility.await().untilAsserted(() -> assertThrows(BKException.BKNoSuchLedgerExistsException.class,
() -> pulsar.getBookKeeperClient().openLedger(
compactedLedgerId.get(), BookKeeper.DigestType.CRC32, new byte[]{})));
}
}
Loading