Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix message loss during topic compaction (apache#20980)
Browse files Browse the repository at this point in the history
(cherry picked from commit 3ab420c)
  • Loading branch information
coderzc committed Aug 14, 2023
1 parent 868ed0b commit 2dc14d4
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public class RawBatchConverter {
public static boolean isReadableBatch(RawMessage msg) {
ByteBuf payload = msg.getHeadersAndPayload();
MessageMetadata metadata = Commands.parseMessageMetadata(payload);
return isReadableBatch(metadata);
}

public static boolean isReadableBatch(MessageMetadata metadata) {
return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0;
}

Expand Down Expand Up @@ -71,9 +75,9 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
if (!smm.isCompactedOut()) {
if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
smm.hasPartitionKey() ? smm.getPartitionKey() : null,
smm.getPartitionKey(),
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
}
singleMessagePayload.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,26 +122,32 @@ private void phaseOneLoop(RawReader reader,
() -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));

future.thenAcceptAsync(m -> {
try {
try (m) {
MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (RawBatchConverter.isReadableBatch(metadata)) {
try {
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
replaceMessage = old != null;
if (old != null) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
} else {
deletedMessage = true;
latestForKey.remove(e.getMiddle());
deleteCnt++;
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
if (deleteCnt == numMessagesInBatch) {
deletedMessage = true;
}
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
Expand Down Expand Up @@ -174,8 +180,6 @@ private void phaseOneLoop(RawReader reader,
lastMessageId,
latestForKey, loopPromise);
}
} finally {
m.close();
}
}, scheduler).exceptionally(ex -> {
loopPromise.completeExceptionally(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -96,6 +97,7 @@
import org.testng.annotations.Test;

@Test(groups = "broker-impl")
@Slf4j
public class CompactionTest extends MockedPulsarServiceBaseTest {
protected ScheduledExecutorService compactionScheduler;
protected BookKeeper bk;
Expand Down Expand Up @@ -553,6 +555,60 @@ public void testBatchMessageIdsDontChange() throws Exception {
}
}

@Test
public void testBatchMessageWithNullValue() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";

pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.receiverQueueSize(1).readCompacted(true).subscribe().close();

try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic)
.maxPendingMessages(3)
.enableBatching(true)
.batchingMaxMessages(3)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create()
) {
// batch 1
producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
producer.newMessage().key("key1").value(null).sendAsync();
producer.newMessage().key("key2").value("my-message-3".getBytes()).send();

// batch 2
producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync();
producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync();
producer.newMessage().key("key3").value("my-message-6".getBytes()).send();

// batch 3
producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync();
producer.newMessage().key("key4").value(null).sendAsync();
producer.newMessage().key("key5").value("my-message-9".getBytes()).send();
}


// compact the topic
compact(topic);

// Read messages before compaction to get ids
List<Message<byte[]>> messages = new ArrayList<>();
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) {
while (true) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
break;
}
messages.add(message);
}
}

assertEquals(messages.size(), 3);
assertEquals(messages.get(0).getKey(), "key2");
assertEquals(messages.get(1).getKey(), "key3");
assertEquals(messages.get(2).getKey(), "key5");
}

@Test
public void testWholeBatchCompactedOut() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
Expand Down

0 comments on commit 2dc14d4

Please sign in to comment.