diff --git a/conf/broker.conf b/conf/broker.conf index e5d8a32e7171c..fc32246adea1f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -563,6 +563,12 @@ brokerServiceCompactionPhaseOneLoopTimeInSeconds=30 # Whether retain null-key message during topic compaction topicCompactionRetainNullKey=false +# Class name of the factory that implements the topic compaction service. +# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory", +# will create topic compaction service based on message eventTime. +# By default compaction service is based on message publishing order. +compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory + # Whether to enable the delayed delivery for messages. # If disabled, messages will be immediately delivered and there will # be no tracking overhead. diff --git a/conf/standalone.conf b/conf/standalone.conf index 30b39af8869d4..ae696410d86bf 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -1318,3 +1318,9 @@ disableBrokerInterceptors=true # Whether retain null-key message during topic compaction topicCompactionRetainNullKey=false + +# Class name of the factory that implements the topic compaction service. +# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory", +# will create topic compaction service based on message eventTime. +# By default compaction service is based on message publishing order. +compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java index 4c24f6d303668..f41a7aedd59b2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java @@ -38,6 +38,7 @@ import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.compaction.MessageCompactionData; public class RawBatchConverter { @@ -51,8 +52,8 @@ public static boolean isReadableBatch(MessageMetadata metadata) { return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0; } - public static List> extractIdsAndKeysAndSize(RawMessage msg) - throws IOException { + public static List extractMessageCompactionData(RawMessage msg) + throws IOException { checkArgument(msg.getMessageIdData().getBatchIndex() == -1); ByteBuf payload = msg.getHeadersAndPayload(); @@ -64,25 +65,35 @@ public static List> extractIdsAndKey int uncompressedSize = metadata.getUncompressedSize(); ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize); - List> idsAndKeysAndSize = new ArrayList<>(); + List messageCompactionDataList = new ArrayList<>(); SingleMessageMetadata smm = new SingleMessageMetadata(); for (int i = 0; i < batchSize; i++) { ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload, - smm, - 0, batchSize); + smm, + 0, batchSize); MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(), - msg.getMessageIdData().getEntryId(), - msg.getMessageIdData().getPartition(), - i); + msg.getMessageIdData().getEntryId(), + msg.getMessageIdData().getPartition(), + i); if (!smm.isCompactedOut()) { - idsAndKeysAndSize.add(ImmutableTriple.of(id, - smm.hasPartitionKey() ? smm.getPartitionKey() : null, - smm.hasPayloadSize() ? smm.getPayloadSize() : 0)); + messageCompactionDataList.add(new MessageCompactionData(id, + smm.hasPartitionKey() ? smm.getPartitionKey() : null, + smm.hasPayloadSize() ? smm.getPayloadSize() : 0, smm.getEventTime())); } singleMessagePayload.release(); } uncompressedPayload.release(); + return messageCompactionDataList; + } + + public static List> extractIdsAndKeysAndSize( + RawMessage msg) + throws IOException { + List> idsAndKeysAndSize = new ArrayList<>(); + for (MessageCompactionData mcd : extractMessageCompactionData(msg)) { + idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize())); + } return idsAndKeysAndSize; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java new file mode 100644 index 0000000000000..5b03f270251a0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.RawBatchConverter; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.apache.pulsar.common.protocol.Markers; +import org.apache.pulsar.common.util.FutureUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compaction will go through the topic in two passes. The first pass + * selects latest offset for each key in the topic. Then the second pass + * writes these values to a ledger. + * + *

The two passes are required to avoid holding the payloads of each of + * the latest values in memory, as the payload can be many orders of + * magnitude larger than a message id. + */ +public abstract class AbstractTwoPhaseCompactor extends Compactor { + + private static final Logger log = LoggerFactory.getLogger(AbstractTwoPhaseCompactor.class); + protected static final int MAX_OUTSTANDING = 500; + protected final Duration phaseOneLoopReadTimeout; + protected final boolean topicCompactionRetainNullKey; + + public AbstractTwoPhaseCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { + super(conf, pulsar, bk, scheduler); + phaseOneLoopReadTimeout = Duration.ofSeconds( + conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); + topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey(); + } + + protected abstract Map toLatestMessageIdForKey(Map latestForKey); + + protected abstract boolean compactMessage(String topic, Map latestForKey, + RawMessage m, MessageId id); + + + protected abstract boolean compactBatchMessage(String topic, Map latestForKey, + RawMessage m, + MessageMetadata metadata, MessageId id); + + @Override + protected CompletableFuture doCompaction(RawReader reader, BookKeeper bk) { + return reader.hasMessageAvailableAsync() + .thenCompose(available -> { + if (available) { + return phaseOne(reader).thenCompose( + (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, toLatestMessageIdForKey(r.latestForKey), bk)); + } else { + log.info("Skip compaction of the empty topic {}", reader.getTopic()); + return CompletableFuture.completedFuture(-1L); + } + }); + } + + private CompletableFuture phaseOne(RawReader reader) { + Map latestForKey = new HashMap<>(); + CompletableFuture loopPromise = new CompletableFuture<>(); + + reader.getLastMessageIdAsync() + .thenAccept(lastMessageId -> { + log.info("Commencing phase one of compaction for {}, reading to {}", + reader.getTopic(), lastMessageId); + // Each entry is processed as a whole, discard the batchIndex part deliberately. + MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId; + MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), + lastImpl.getEntryId(), + lastImpl.getPartitionIndex()); + phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey, + loopPromise); + }).exceptionally(ex -> { + loopPromise.completeExceptionally(ex); + return null; + }); + + return loopPromise; + } + + private void phaseOneLoop(RawReader reader, + Optional firstMessageId, + Optional toMessageId, + MessageId lastMessageId, + Map latestForKey, + CompletableFuture loopPromise) { + if (loopPromise.isDone()) { + return; + } + CompletableFuture future = reader.readNextAsync(); + FutureUtil.addTimeoutHandling(future, + phaseOneLoopReadTimeout, scheduler, + () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)")); + + future.thenAcceptAsync(m -> { + try (m) { + MessageId id = m.getMessageId(); + boolean deletedMessage = false; + mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); + MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); + if (Markers.isServerOnlyMarker(metadata)) { + mxBean.addCompactionRemovedEvent(reader.getTopic()); + deletedMessage = true; + } else if (RawBatchConverter.isReadableBatch(metadata)) { + deletedMessage = compactBatchMessage(reader.getTopic(), latestForKey, m, metadata, id); + } else { + deletedMessage = compactMessage(reader.getTopic(), latestForKey, m, id); + } + MessageId first = firstMessageId.orElse(deletedMessage ? null : id); + MessageId to = deletedMessage ? toMessageId.orElse(null) : id; + if (id.compareTo(lastMessageId) == 0) { + loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to, + lastMessageId, latestForKey)); + } else { + phaseOneLoop(reader, + Optional.ofNullable(first), + Optional.ofNullable(to), + lastMessageId, + latestForKey, loopPromise); + } + } + }, scheduler).exceptionally(ex -> { + loopPromise.completeExceptionally(ex); + return null; + }); + } + + private CompletableFuture phaseTwo(RawReader reader, MessageId from, MessageId to, + MessageId lastReadId, + Map latestForKey, BookKeeper bk) { + Map metadata = + LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(), to.toByteArray()); + return createLedger(bk, metadata).thenCompose((ledger) -> { + log.info( + "Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", + reader.getTopic(), from, to, latestForKey.size(), ledger.getId()); + return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger); + }); + } + + private CompletableFuture phaseTwoSeekThenLoop(RawReader reader, MessageId from, + MessageId to, + MessageId lastReadId, Map latestForKey, BookKeeper bk, + LedgerHandle ledger) { + CompletableFuture promise = new CompletableFuture<>(); + + reader.seekAsync(from).thenCompose((v) -> { + Semaphore outstanding = new Semaphore(MAX_OUTSTANDING); + CompletableFuture loopPromise = new CompletableFuture<>(); + phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise, MessageId.earliest); + return loopPromise; + }).thenCompose((v) -> closeLedger(ledger)) + .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId, + Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId()))) + .whenComplete((res, exception) -> { + if (exception != null) { + deleteLedger(bk, ledger).whenComplete((res2, exception2) -> { + if (exception2 != null) { + log.warn("Cleanup of ledger {} for failed", ledger, exception2); + } + // complete with original exception + promise.completeExceptionally(exception); + }); + } else { + promise.complete(ledger.getId()); + } + }); + return promise; + } + + private void phaseTwoLoop(RawReader reader, MessageId to, Map latestForKey, + LedgerHandle lh, Semaphore outstanding, CompletableFuture promise, + MessageId lastCompactedMessageId) { + if (promise.isDone()) { + return; + } + reader.readNextAsync().thenAcceptAsync(m -> { + if (promise.isDone()) { + m.close(); + return; + } + + if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) { + m.close(); + phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId); + return; + } + + try { + MessageId id = m.getMessageId(); + Optional messageToAdd = Optional.empty(); + mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); + MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); + if (Markers.isServerOnlyMarker(metadata)) { + messageToAdd = Optional.empty(); + } else if (RawBatchConverter.isReadableBatch(metadata)) { + try { + messageToAdd = rebatchMessage(reader.getTopic(), + m, (key, subid) -> subid.equals(latestForKey.get(key)), + topicCompactionRetainNullKey); + } catch (IOException ioe) { + log.info("Error decoding batch for message {}. Whole batch will be included in output", + id, ioe); + messageToAdd = Optional.of(m); + } + } else { + Pair keyAndSize = extractKeyAndSize(m); + MessageId msg; + if (keyAndSize == null) { + messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : Optional.empty(); + } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null + && msg.equals(id)) { // consider message only if present into latestForKey map + if (keyAndSize.getRight() <= 0) { + promise.completeExceptionally(new IllegalArgumentException( + "Compaction phase found empty record from sorted key-map")); + } + messageToAdd = Optional.of(m); + } + } + + if (messageToAdd.isPresent()) { + RawMessage message = messageToAdd.get(); + try { + outstanding.acquire(); + CompletableFuture addFuture = addToCompactedLedger(lh, message, reader.getTopic()) + .whenComplete((res, exception2) -> { + outstanding.release(); + if (exception2 != null) { + promise.completeExceptionally(exception2); + } + }); + if (to.equals(id)) { + // make sure all inflight writes have finished + outstanding.acquire(MAX_OUTSTANDING); + addFuture.whenComplete((res, exception2) -> { + if (exception2 == null) { + promise.complete(null); + } + }); + return; + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(ie); + } finally { + if (message != m) { + message.close(); + } + } + } else if (to.equals(id)) { + // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, + // not present under latestForKey. Complete the compaction. + try { + // make sure all inflight writes have finished + outstanding.acquire(MAX_OUTSTANDING); + promise.complete(null); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + promise.completeExceptionally(e); + } + return; + } + phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, m.getMessageId()); + } finally { + m.close(); + } + }, scheduler).exceptionally(ex -> { + promise.completeExceptionally(ex); + return null; + }); + } + + protected CompletableFuture createLedger(BookKeeper bk, + Map metadata) { + CompletableFuture bkf = new CompletableFuture<>(); + + try { + bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(), + conf.getManagedLedgerDefaultWriteQuorum(), + conf.getManagedLedgerDefaultAckQuorum(), + Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, + Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, + (rc, ledger, ctx) -> { + if (rc != BKException.Code.OK) { + bkf.completeExceptionally(BKException.create(rc)); + } else { + bkf.complete(ledger); + } + }, null, metadata); + } catch (Throwable t) { + log.error("Encountered unexpected error when creating compaction ledger", t); + return FutureUtil.failedFuture(t); + } + return bkf; + } + + protected CompletableFuture deleteLedger(BookKeeper bk, LedgerHandle lh) { + CompletableFuture bkf = new CompletableFuture<>(); + try { + bk.asyncDeleteLedger(lh.getId(), + (rc, ctx) -> { + if (rc != BKException.Code.OK) { + bkf.completeExceptionally(BKException.create(rc)); + } else { + bkf.complete(null); + } + }, null); + } catch (Throwable t) { + return FutureUtil.failedFuture(t); + } + return bkf; + } + + protected CompletableFuture closeLedger(LedgerHandle lh) { + CompletableFuture bkf = new CompletableFuture<>(); + try { + lh.asyncClose((rc, ledger, ctx) -> { + if (rc != BKException.Code.OK) { + bkf.completeExceptionally(BKException.create(rc)); + } else { + bkf.complete(null); + } + }, null); + } catch (Throwable t) { + return FutureUtil.failedFuture(t); + } + return bkf; + } + + private CompletableFuture addToCompactedLedger(LedgerHandle lh, RawMessage m, + String topic) { + CompletableFuture bkf = new CompletableFuture<>(); + ByteBuf serialized = m.serialize(); + try { + mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes()); + long start = System.nanoTime(); + lh.asyncAddEntry(serialized, + (rc, ledger, eid, ctx) -> { + mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS); + if (rc != BKException.Code.OK) { + bkf.completeExceptionally(BKException.create(rc)); + } else { + bkf.complete(null); + } + }, null); + } catch (Throwable t) { + return FutureUtil.failedFuture(t); + } + return bkf; + } + + protected Pair extractKeyAndSize(RawMessage m) { + ByteBuf headersAndPayload = m.getHeadersAndPayload(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + if (msgMetadata.hasPartitionKey()) { + int size = headersAndPayload.readableBytes(); + if (msgMetadata.hasUncompressedSize()) { + size = msgMetadata.getUncompressedSize(); + } + return Pair.of(msgMetadata.getPartitionKey(), size); + } else { + return null; + } + } + + + protected Optional rebatchMessage(String topic, RawMessage msg, + BiPredicate filter, + boolean retainNullKey) + throws IOException { + if (log.isDebugEnabled()) { + log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic); + } + return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey); + } + + protected static class PhaseOneResult { + + final MessageId from; + final MessageId to; // last undeleted messageId + final MessageId lastReadId; // last read messageId + final Map latestForKey; + + PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, + Map latestForKey) { + this.from = from; + this.to = to; + this.lastReadId = lastReadId; + this.latestForKey = latestForKey; + } + } + + public long getPhaseOneLoopReadTimeoutInSeconds() { + return phaseOneLoopReadTimeout.getSeconds(); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index fe77db33692b9..ba68e07cf5b0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -172,7 +172,7 @@ public static void main(String[] args) throws Exception { @Cleanup PulsarClient pulsar = createClient(brokerConfig); - Compactor compactor = new TwoPhaseCompactor(brokerConfig, pulsar, bk, scheduler); + Compactor compactor = new PublishingOrderCompactor(brokerConfig, pulsar, bk, scheduler); long ledgerId = compactor.compact(arguments.topic).get(); log.info("Compaction of topic {} complete. Compacted to ledger {}", arguments.topic, ledgerId); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java new file mode 100644 index 0000000000000..383c7b1aeedd6 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeCompactionServiceFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; + +public class EventTimeCompactionServiceFactory extends PulsarCompactionServiceFactory { + + @Override + protected Compactor newCompactor() throws PulsarServerException { + PulsarService pulsarService = getPulsarService(); + return new EventTimeOrderCompactor(pulsarService.getConfiguration(), + pulsarService.getClient(), pulsarService.getBookKeeperClient(), + pulsarService.getCompactorExecutor()); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java new file mode 100644 index 0000000000000..2cd19ba15d608 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawBatchConverter; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EventTimeOrderCompactor extends AbstractTwoPhaseCompactor> { + + private static final Logger log = LoggerFactory.getLogger(EventTimeOrderCompactor.class); + + public EventTimeOrderCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { + super(conf, pulsar, bk, scheduler); + } + + @Override + protected Map toLatestMessageIdForKey( + Map> latestForKey) { + return latestForKey.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().getLeft())); + } + + @Override + protected boolean compactMessage(String topic, Map> latestForKey, + RawMessage m, MessageId id) { + boolean deletedMessage = false; + boolean replaceMessage = false; + MessageCompactionData mcd = extractMessageCompactionData(m); + + if (mcd != null) { + boolean newer = Optional.ofNullable(latestForKey.get(mcd.key())) + .map(Pair::getRight) + .map(latestEventTime -> mcd.eventTime() != null + && mcd.eventTime() >= latestEventTime).orElse(true); + if (newer) { + if (mcd.payloadSize() > 0) { + Pair old = latestForKey.put(mcd.key(), + new ImmutablePair<>(mcd.messageId(), mcd.eventTime())); + replaceMessage = old != null; + } else { + deletedMessage = true; + latestForKey.remove(mcd.key()); + } + } + } else { + if (!topicCompactionRetainNullKey) { + deletedMessage = true; + } + } + if (replaceMessage || deletedMessage) { + mxBean.addCompactionRemovedEvent(topic); + } + return deletedMessage; + } + + @Override + protected boolean compactBatchMessage(String topic, Map> latestForKey, RawMessage m, + MessageMetadata metadata, MessageId id) { + boolean deletedMessage = false; + try { + int numMessagesInBatch = metadata.getNumMessagesInBatch(); + int deleteCnt = 0; + + for (MessageCompactionData mcd : extractMessageCompactionDataFromBatch(m)) { + if (mcd.key() == null) { + if (!topicCompactionRetainNullKey) { + // record delete null-key message event + deleteCnt++; + mxBean.addCompactionRemovedEvent(topic); + } + continue; + } + + boolean newer = Optional.ofNullable(latestForKey.get(mcd.key())) + .map(Pair::getRight) + .map(latestEventTime -> mcd.eventTime() != null + && mcd.eventTime() > latestEventTime).orElse(true); + if (newer) { + if (mcd.payloadSize() > 0) { + Pair old = latestForKey.put(mcd.key(), + new ImmutablePair<>(mcd.messageId(), mcd.eventTime())); + if (old != null) { + mxBean.addCompactionRemovedEvent(topic); + } + } else { + latestForKey.remove(mcd.key()); + deleteCnt++; + mxBean.addCompactionRemovedEvent(topic); + } + } + } + + if (deleteCnt == numMessagesInBatch) { + deletedMessage = true; + } + } catch (IOException ioe) { + log.info("Error decoding batch for message {}. Whole batch will be included in output", + id, ioe); + } + return deletedMessage; + } + + protected MessageCompactionData extractMessageCompactionData(RawMessage m) { + ByteBuf headersAndPayload = m.getHeadersAndPayload(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + if (msgMetadata.hasPartitionKey()) { + int size = headersAndPayload.readableBytes(); + if (msgMetadata.hasUncompressedSize()) { + size = msgMetadata.getUncompressedSize(); + } + return new MessageCompactionData(m.getMessageId(), msgMetadata.getPartitionKey(), + size, msgMetadata.getEventTime()); + } else { + return null; + } + } + + private List extractMessageCompactionDataFromBatch(RawMessage msg) + throws IOException { + return RawBatchConverter.extractMessageCompactionData(msg); + } +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java new file mode 100644 index 0000000000000..03800273a806e --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/MessageCompactionData.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import org.apache.pulsar.client.api.MessageId; + +public record MessageCompactionData (MessageId messageId, String key, Integer payloadSize, Long eventTime) {} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java new file mode 100644 index 0000000000000..a825c0782fbf9 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PublishingOrderCompactor.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawBatchConverter; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PublishingOrderCompactor extends AbstractTwoPhaseCompactor { + + private static final Logger log = LoggerFactory.getLogger(PublishingOrderCompactor.class); + + public PublishingOrderCompactor(ServiceConfiguration conf, + PulsarClient pulsar, + BookKeeper bk, + ScheduledExecutorService scheduler) { + super(conf, pulsar, bk, scheduler); + } + + @Override + protected Map toLatestMessageIdForKey(Map latestForKey) { + return latestForKey; + } + + @Override + protected boolean compactMessage(String topic, Map latestForKey, + RawMessage m, MessageId id) { + boolean deletedMessage = false; + boolean replaceMessage = false; + Pair keyAndSize = extractKeyAndSize(m); + if (keyAndSize != null) { + if (keyAndSize.getRight() > 0) { + MessageId old = latestForKey.put(keyAndSize.getLeft(), id); + replaceMessage = old != null; + } else { + deletedMessage = true; + latestForKey.remove(keyAndSize.getLeft()); + } + } else { + if (!topicCompactionRetainNullKey) { + deletedMessage = true; + } + } + if (replaceMessage || deletedMessage) { + mxBean.addCompactionRemovedEvent(topic); + } + return deletedMessage; + } + + @Override + protected boolean compactBatchMessage(String topic, Map latestForKey, + RawMessage m, MessageMetadata metadata, MessageId id) { + boolean deletedMessage = false; + try { + int numMessagesInBatch = metadata.getNumMessagesInBatch(); + int deleteCnt = 0; + for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch( + m)) { + if (e != null) { + if (e.getMiddle() == null) { + if (!topicCompactionRetainNullKey) { + // record delete null-key message event + deleteCnt++; + mxBean.addCompactionRemovedEvent(topic); + } + continue; + } + if (e.getRight() > 0) { + MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); + if (old != null) { + mxBean.addCompactionRemovedEvent(topic); + } + } else { + latestForKey.remove(e.getMiddle()); + deleteCnt++; + mxBean.addCompactionRemovedEvent(topic); + } + } + } + if (deleteCnt == numMessagesInBatch) { + deletedMessage = true; + } + } catch (IOException ioe) { + log.info( + "Error decoding batch for message {}. Whole batch will be included in output", + id, ioe); + } + + return deletedMessage; + } + + protected List> extractIdsAndKeysAndSizeFromBatch( + RawMessage msg) + throws IOException { + return RawBatchConverter.extractIdsAndKeysAndSize(msg); + } + +} \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java index 424733ad58158..90132461b4c4a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarCompactionServiceFactory.java @@ -54,7 +54,7 @@ public Compactor getNullableCompactor() { } protected Compactor newCompactor() throws PulsarServerException { - return new TwoPhaseCompactor(pulsarService.getConfiguration(), + return new PublishingOrderCompactor(pulsarService.getConfiguration(), pulsarService.getClient(), pulsarService.getBookKeeperClient(), pulsarService.getCompactorExecutor()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java index fefa2ee959cc5..1b54092d9aa4f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/StrategicTwoPhaseCompactor.java @@ -59,7 +59,7 @@ *

As the first pass caches the entire message(not just offset) for each key into a map, * this compaction could be memory intensive if the message payload is large. */ -public class StrategicTwoPhaseCompactor extends TwoPhaseCompactor { +public class StrategicTwoPhaseCompactor extends PublishingOrderCompactor { private static final Logger log = LoggerFactory.getLogger(StrategicTwoPhaseCompactor.class); private static final int MAX_OUTSTANDING = 500; private static final int MAX_READER_RECONNECT_WAITING_TIME_IN_MILLIS = 20 * 1000; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java deleted file mode 100644 index 647c34a94ad81..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ /dev/null @@ -1,470 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.compaction; - -import io.netty.buffer.ByteBuf; -import java.io.IOException; -import java.time.Duration; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.function.BiPredicate; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.mledger.impl.LedgerMetadataUtils; -import org.apache.commons.lang3.tuple.ImmutableTriple; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.RawMessage; -import org.apache.pulsar.client.api.RawReader; -import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.RawBatchConverter; -import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.Markers; -import org.apache.pulsar.common.util.FutureUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Compaction will go through the topic in two passes. The first pass - * selects latest offset for each key in the topic. Then the second pass - * writes these values to a ledger. - * - *

The two passes are required to avoid holding the payloads of each of - * the latest values in memory, as the payload can be many orders of - * magnitude larger than a message id. -*/ -public class TwoPhaseCompactor extends Compactor { - private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class); - private static final int MAX_OUTSTANDING = 500; - private final Duration phaseOneLoopReadTimeout; - private final boolean topicCompactionRetainNullKey; - - public TwoPhaseCompactor(ServiceConfiguration conf, - PulsarClient pulsar, - BookKeeper bk, - ScheduledExecutorService scheduler) { - super(conf, pulsar, bk, scheduler); - phaseOneLoopReadTimeout = Duration.ofSeconds(conf.getBrokerServiceCompactionPhaseOneLoopTimeInSeconds()); - topicCompactionRetainNullKey = conf.isTopicCompactionRetainNullKey(); - } - - @Override - protected CompletableFuture doCompaction(RawReader reader, BookKeeper bk) { - return reader.hasMessageAvailableAsync() - .thenCompose(available -> { - if (available) { - return phaseOne(reader).thenCompose( - (r) -> phaseTwo(reader, r.from, r.to, r.lastReadId, r.latestForKey, bk)); - } else { - log.info("Skip compaction of the empty topic {}", reader.getTopic()); - return CompletableFuture.completedFuture(-1L); - } - }); - } - - private CompletableFuture phaseOne(RawReader reader) { - Map latestForKey = new HashMap<>(); - CompletableFuture loopPromise = new CompletableFuture<>(); - - reader.getLastMessageIdAsync() - .thenAccept(lastMessageId -> { - log.info("Commencing phase one of compaction for {}, reading to {}", - reader.getTopic(), lastMessageId); - // Each entry is processed as a whole, discard the batchIndex part deliberately. - MessageIdImpl lastImpl = (MessageIdImpl) lastMessageId; - MessageIdImpl lastEntryMessageId = new MessageIdImpl(lastImpl.getLedgerId(), lastImpl.getEntryId(), - lastImpl.getPartitionIndex()); - phaseOneLoop(reader, Optional.empty(), Optional.empty(), lastEntryMessageId, latestForKey, - loopPromise); - }).exceptionally(ex -> { - loopPromise.completeExceptionally(ex); - return null; - }); - - return loopPromise; - } - - private void phaseOneLoop(RawReader reader, - Optional firstMessageId, - Optional toMessageId, - MessageId lastMessageId, - Map latestForKey, - CompletableFuture loopPromise) { - if (loopPromise.isDone()) { - return; - } - CompletableFuture future = reader.readNextAsync(); - FutureUtil.addTimeoutHandling(future, - phaseOneLoopReadTimeout, scheduler, - () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)")); - - future.thenAcceptAsync(m -> { - try (m) { - MessageId id = m.getMessageId(); - boolean deletedMessage = false; - boolean replaceMessage = false; - mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); - MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); - if (Markers.isServerOnlyMarker(metadata)) { - mxBean.addCompactionRemovedEvent(reader.getTopic()); - deletedMessage = true; - } else if (RawBatchConverter.isReadableBatch(metadata)) { - try { - int numMessagesInBatch = metadata.getNumMessagesInBatch(); - int deleteCnt = 0; - for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch(m)) { - if (e != null) { - if (e.getMiddle() == null) { - if (!topicCompactionRetainNullKey) { - // record delete null-key message event - deleteCnt++; - mxBean.addCompactionRemovedEvent(reader.getTopic()); - } - continue; - } - if (e.getRight() > 0) { - MessageId old = latestForKey.put(e.getMiddle(), e.getLeft()); - if (old != null) { - mxBean.addCompactionRemovedEvent(reader.getTopic()); - } - } else { - latestForKey.remove(e.getMiddle()); - deleteCnt++; - mxBean.addCompactionRemovedEvent(reader.getTopic()); - } - } - } - if (deleteCnt == numMessagesInBatch) { - deletedMessage = true; - } - } catch (IOException ioe) { - log.info("Error decoding batch for message {}. Whole batch will be included in output", - id, ioe); - } - } else { - Pair keyAndSize = extractKeyAndSize(m); - if (keyAndSize != null) { - if (keyAndSize.getRight() > 0) { - MessageId old = latestForKey.put(keyAndSize.getLeft(), id); - replaceMessage = old != null; - } else { - deletedMessage = true; - latestForKey.remove(keyAndSize.getLeft()); - } - } else { - if (!topicCompactionRetainNullKey) { - deletedMessage = true; - } - } - if (replaceMessage || deletedMessage) { - mxBean.addCompactionRemovedEvent(reader.getTopic()); - } - } - MessageId first = firstMessageId.orElse(deletedMessage ? null : id); - MessageId to = deletedMessage ? toMessageId.orElse(null) : id; - if (id.compareTo(lastMessageId) == 0) { - loopPromise.complete(new PhaseOneResult(first == null ? id : first, to == null ? id : to, - lastMessageId, latestForKey)); - } else { - phaseOneLoop(reader, - Optional.ofNullable(first), - Optional.ofNullable(to), - lastMessageId, - latestForKey, loopPromise); - } - } - }, scheduler).exceptionally(ex -> { - loopPromise.completeExceptionally(ex); - return null; - }); - } - - private CompletableFuture phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId, - Map latestForKey, BookKeeper bk) { - Map metadata = - LedgerMetadataUtils.buildMetadataForCompactedLedger(reader.getTopic(), to.toByteArray()); - return createLedger(bk, metadata).thenCompose((ledger) -> { - log.info("Commencing phase two of compaction for {}, from {} to {}, compacting {} keys to ledger {}", - reader.getTopic(), from, to, latestForKey.size(), ledger.getId()); - return phaseTwoSeekThenLoop(reader, from, to, lastReadId, latestForKey, bk, ledger); - }); - } - - private CompletableFuture phaseTwoSeekThenLoop(RawReader reader, MessageId from, MessageId to, - MessageId lastReadId, Map latestForKey, BookKeeper bk, LedgerHandle ledger) { - CompletableFuture promise = new CompletableFuture<>(); - - reader.seekAsync(from).thenCompose((v) -> { - Semaphore outstanding = new Semaphore(MAX_OUTSTANDING); - CompletableFuture loopPromise = new CompletableFuture<>(); - phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise, MessageId.earliest); - return loopPromise; - }).thenCompose((v) -> closeLedger(ledger)) - .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId, - Map.of(COMPACTED_TOPIC_LEDGER_PROPERTY, ledger.getId()))) - .whenComplete((res, exception) -> { - if (exception != null) { - deleteLedger(bk, ledger).whenComplete((res2, exception2) -> { - if (exception2 != null) { - log.warn("Cleanup of ledger {} for failed", ledger, exception2); - } - // complete with original exception - promise.completeExceptionally(exception); - }); - } else { - promise.complete(ledger.getId()); - } - }); - return promise; - } - - private void phaseTwoLoop(RawReader reader, MessageId to, Map latestForKey, - LedgerHandle lh, Semaphore outstanding, CompletableFuture promise, - MessageId lastCompactedMessageId) { - if (promise.isDone()) { - return; - } - reader.readNextAsync().thenAcceptAsync(m -> { - if (promise.isDone()) { - m.close(); - return; - } - - if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) { - m.close(); - phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId); - return; - } - - try { - MessageId id = m.getMessageId(); - Optional messageToAdd = Optional.empty(); - mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); - MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); - if (Markers.isServerOnlyMarker(metadata)) { - messageToAdd = Optional.empty(); - } else if (RawBatchConverter.isReadableBatch(metadata)) { - try { - messageToAdd = rebatchMessage(reader.getTopic(), - m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey); - } catch (IOException ioe) { - log.info("Error decoding batch for message {}. Whole batch will be included in output", - id, ioe); - messageToAdd = Optional.of(m); - } - } else { - Pair keyAndSize = extractKeyAndSize(m); - MessageId msg; - if (keyAndSize == null) { - messageToAdd = topicCompactionRetainNullKey ? Optional.of(m) : Optional.empty(); - } else if ((msg = latestForKey.get(keyAndSize.getLeft())) != null - && msg.equals(id)) { // consider message only if present into latestForKey map - if (keyAndSize.getRight() <= 0) { - promise.completeExceptionally(new IllegalArgumentException( - "Compaction phase found empty record from sorted key-map")); - } - messageToAdd = Optional.of(m); - } - } - - if (messageToAdd.isPresent()) { - RawMessage message = messageToAdd.get(); - try { - outstanding.acquire(); - CompletableFuture addFuture = addToCompactedLedger(lh, message, reader.getTopic()) - .whenComplete((res, exception2) -> { - outstanding.release(); - if (exception2 != null) { - promise.completeExceptionally(exception2); - } - }); - if (to.equals(id)) { - // make sure all inflight writes have finished - outstanding.acquire(MAX_OUTSTANDING); - addFuture.whenComplete((res, exception2) -> { - if (exception2 == null) { - promise.complete(null); - } - }); - return; - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - promise.completeExceptionally(ie); - } finally { - if (message != m) { - message.close(); - } - } - } else if (to.equals(id)) { - // Reached to last-id and phase-one found it deleted-message while iterating on ledger so, - // not present under latestForKey. Complete the compaction. - try { - // make sure all inflight writes have finished - outstanding.acquire(MAX_OUTSTANDING); - promise.complete(null); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - promise.completeExceptionally(e); - } - return; - } - phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, m.getMessageId()); - } finally { - m.close(); - } - }, scheduler).exceptionally(ex -> { - promise.completeExceptionally(ex); - return null; - }); - } - - protected CompletableFuture createLedger(BookKeeper bk, Map metadata) { - CompletableFuture bkf = new CompletableFuture<>(); - - try { - bk.asyncCreateLedger(conf.getManagedLedgerDefaultEnsembleSize(), - conf.getManagedLedgerDefaultWriteQuorum(), - conf.getManagedLedgerDefaultAckQuorum(), - Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, - Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD, - (rc, ledger, ctx) -> { - if (rc != BKException.Code.OK) { - bkf.completeExceptionally(BKException.create(rc)); - } else { - bkf.complete(ledger); - } - }, null, metadata); - } catch (Throwable t) { - log.error("Encountered unexpected error when creating compaction ledger", t); - return FutureUtil.failedFuture(t); - } - return bkf; - } - - protected CompletableFuture deleteLedger(BookKeeper bk, LedgerHandle lh) { - CompletableFuture bkf = new CompletableFuture<>(); - try { - bk.asyncDeleteLedger(lh.getId(), - (rc, ctx) -> { - if (rc != BKException.Code.OK) { - bkf.completeExceptionally(BKException.create(rc)); - } else { - bkf.complete(null); - } - }, null); - } catch (Throwable t) { - return FutureUtil.failedFuture(t); - } - return bkf; - } - - protected CompletableFuture closeLedger(LedgerHandle lh) { - CompletableFuture bkf = new CompletableFuture<>(); - try { - lh.asyncClose((rc, ledger, ctx) -> { - if (rc != BKException.Code.OK) { - bkf.completeExceptionally(BKException.create(rc)); - } else { - bkf.complete(null); - } - }, null); - } catch (Throwable t) { - return FutureUtil.failedFuture(t); - } - return bkf; - } - - private CompletableFuture addToCompactedLedger(LedgerHandle lh, RawMessage m, String topic) { - CompletableFuture bkf = new CompletableFuture<>(); - ByteBuf serialized = m.serialize(); - try { - mxBean.addCompactionWriteOp(topic, m.getHeadersAndPayload().readableBytes()); - long start = System.nanoTime(); - lh.asyncAddEntry(serialized, - (rc, ledger, eid, ctx) -> { - mxBean.addCompactionLatencyOp(topic, System.nanoTime() - start, TimeUnit.NANOSECONDS); - if (rc != BKException.Code.OK) { - bkf.completeExceptionally(BKException.create(rc)); - } else { - bkf.complete(null); - } - }, null); - } catch (Throwable t) { - return FutureUtil.failedFuture(t); - } - return bkf; - } - - protected Pair extractKeyAndSize(RawMessage m) { - ByteBuf headersAndPayload = m.getHeadersAndPayload(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - if (msgMetadata.hasPartitionKey()) { - int size = headersAndPayload.readableBytes(); - if (msgMetadata.hasUncompressedSize()) { - size = msgMetadata.getUncompressedSize(); - } - return Pair.of(msgMetadata.getPartitionKey(), size); - } else { - return null; - } - } - - protected List> extractIdsAndKeysAndSizeFromBatch(RawMessage msg) - throws IOException { - return RawBatchConverter.extractIdsAndKeysAndSize(msg); - } - - protected Optional rebatchMessage(String topic, RawMessage msg, BiPredicate filter, - boolean retainNullKey) - throws IOException { - if (log.isDebugEnabled()) { - log.debug("Rebatching message {} for topic {}", msg.getMessageId(), topic); - } - return RawBatchConverter.rebatchMessage(msg, filter, retainNullKey); - } - - private static class PhaseOneResult { - final MessageId from; - final MessageId to; // last undeleted messageId - final MessageId lastReadId; // last read messageId - final Map latestForKey; - - PhaseOneResult(MessageId from, MessageId to, MessageId lastReadId, Map latestForKey) { - this.from = from; - this.to = to; - this.lastReadId = lastReadId; - this.latestForKey = latestForKey; - } - } - - public long getPhaseOneLoopReadTimeoutInSeconds() { - return phaseOneLoopReadTimeout.getSeconds(); - } -} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java index ac1ba6bc814b1..45dc30d21df64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java @@ -62,7 +62,7 @@ public class CompactionRetentionTest extends MockedPulsarServiceBaseTest { protected ScheduledExecutorService compactionScheduler; protected BookKeeper bk; - private TwoPhaseCompactor compactor; + private PublishingOrderCompactor compactor; @BeforeMethod @Override @@ -79,7 +79,7 @@ public void setup() throws Exception { compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get(); - compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, compactionScheduler); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 0cf32859e3dd6..19f42a7e0570f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -109,7 +109,7 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { protected ScheduledExecutorService compactionScheduler; protected BookKeeper bk; - private TwoPhaseCompactor compactor; + private PublishingOrderCompactor compactor; @BeforeMethod @Override @@ -124,7 +124,7 @@ public void setup() throws Exception { compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compaction-%d").setDaemon(true).build()); bk = pulsar.getBookKeeperClientFactory().create(this.conf, null, null, Optional.empty(), null).get(); - compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, compactionScheduler); } @AfterMethod(alwaysRun = true) @@ -147,7 +147,7 @@ protected long compact(String topic, CryptoKeyReader cryptoKeyReader) return compactor.compact(topic).get(); } - protected TwoPhaseCompactor getCompactor() { + protected PublishingOrderCompactor getCompactor() { return compactor; } @@ -656,7 +656,7 @@ public static Object[][] retainNullKey() { public void testKeyLessMessagesPassThrough(boolean retainNullKey) throws Exception { conf.setTopicCompactionRetainNullKey(retainNullKey); restartBroker(); - FieldUtils.writeDeclaredField(compactor, "topicCompactionRetainNullKey", retainNullKey, true); + FieldUtils.writeField(compactor, "topicCompactionRetainNullKey", retainNullKey, true); String topic = "persistent://my-property/use/my-ns/my-topic1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java index 1c09dc0d6434c..5cf7d33200d66 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java @@ -101,7 +101,7 @@ public void setup() throws Exception { new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); bk = pulsar.getBookKeeperClientFactory().create( this.conf, null, null, Optional.empty(), null).get(); - compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, compactionScheduler); } @@ -127,7 +127,7 @@ protected Compactor getCompactor() { return compactor; } - private List compactAndVerify(String topic, Map expected, boolean checkMetrics) + protected List compactAndVerify(String topic, Map expected, boolean checkMetrics) throws Exception { long compactedLedgerId = compact(topic); @@ -361,7 +361,7 @@ public void testPhaseOneLoopTimeConfiguration() { PulsarClientImpl mockClient = mock(PulsarClientImpl.class); ConnectionPool connectionPool = mock(ConnectionPool.class); when(mockClient.getCnxPool()).thenReturn(connectionPool); - TwoPhaseCompactor compactor = new TwoPhaseCompactor(configuration, mockClient, + PublishingOrderCompactor compactor = new PublishingOrderCompactor(configuration, mockClient, Mockito.mock(BookKeeper.class), compactionScheduler); Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java new file mode 100644 index 0000000000000..8fba0983123ee --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/EventTimeOrderCompactorTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricDoubleSumValue; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.common.Attributes; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.ConnectionPool; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-compaction") +public class EventTimeOrderCompactorTest extends CompactorTest { + + private EventTimeOrderCompactor compactor; + + @BeforeMethod + @Override + public void setup() throws Exception { + super.setup(); + compactor = new EventTimeOrderCompactor(conf, pulsarClient, bk, compactionScheduler); + } + + @Override + protected long compact(String topic) throws ExecutionException, InterruptedException { + return compactor.compact(topic).get(); + } + + @Override + protected Compactor getCompactor() { + return compactor; + } + + @Test + public void testCompactedOutByEventTime() throws Exception { + String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/use/my-ns/testCompactedOutByEventTime"); + this.restartBroker(); + + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(true).topic(topicName).batchingMaxMessages(3).create(); + + producer.newMessage().key("K1").value("V1").eventTime(1L).sendAsync(); + producer.newMessage().key("K2").value("V2").eventTime(1L).sendAsync(); + producer.newMessage().key("K2").value(null).eventTime(2L).sendAsync(); + producer.flush(); + + admin.topics().triggerCompaction(topicName); + + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.SUCCESS); + }); + + var attributes = Attributes.builder() + .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent") + .put(OpenTelemetryAttributes.PULSAR_TENANT, "my-property") + .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "my-property/use/my-ns") + .put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName) + .build(); + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_REMOVED_COUNTER, attributes, 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "success") + .build(), + 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_OPERATION_COUNTER, Attributes.builder() + .putAll(attributes) + .put(OpenTelemetryAttributes.PULSAR_COMPACTION_STATUS, "failure") + .build(), + 0); + assertMetricDoubleSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_DURATION_SECONDS, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_BYTES_IN_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_BYTES_OUT_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_ENTRIES_COUNTER, attributes, 1); + assertMetricLongSumValue(metrics, OpenTelemetryTopicStats.COMPACTION_BYTES_COUNTER, attributes, + actual -> assertThat(actual).isPositive()); + + producer.newMessage().key("K1").eventTime(2L).value("V1-2").sendAsync(); + producer.flush(); + + admin.topics().triggerCompaction(topicName); + + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.SUCCESS); + }); + + @Cleanup + Reader reader = pulsarClient.newReader(Schema.STRING) + .subscriptionName("reader-test") + .topic(topicName) + .readCompacted(true) + .startMessageId(MessageId.earliest) + .create(); + while (reader.hasMessageAvailable()) { + Message message = reader.readNext(3, TimeUnit.SECONDS); + Assert.assertEquals(message.getEventTime(), 2L); + } + } + + @Test + public void testCompactWithEventTimeAddCompact() throws Exception { + String topic = "persistent://my-property/use/my-ns/my-topic1"; + + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + Map expected = new HashMap<>(); + + producer.newMessage() + .key("a") + .eventTime(1L) + .value("A_1".getBytes()) + .send(); + producer.newMessage() + .key("b") + .eventTime(1L) + .value("B_1".getBytes()) + .send(); + producer.newMessage() + .key("a") + .eventTime(2L) + .value("A_2".getBytes()) + .send(); + expected.put("a", "A_2".getBytes()); + expected.put("b", "B_1".getBytes()); + + compactAndVerify(topic, new HashMap<>(expected), false); + + producer.newMessage() + .key("b") + .eventTime(2L) + .value("B_2".getBytes()) + .send(); + expected.put("b", "B_2".getBytes()); + + compactAndVerify(topic, expected, false); + } + + @Override + @Test + public void testPhaseOneLoopTimeConfiguration() { + ServiceConfiguration configuration = new ServiceConfiguration(); + configuration.setBrokerServiceCompactionPhaseOneLoopTimeInSeconds(60); + PulsarClientImpl mockClient = mock(PulsarClientImpl.class); + ConnectionPool connectionPool = mock(ConnectionPool.class); + when(mockClient.getCnxPool()).thenReturn(connectionPool); + EventTimeOrderCompactor compactor = new EventTimeOrderCompactor(configuration, mockClient, + Mockito.mock(BookKeeper.class), compactionScheduler); + Assert.assertEquals(compactor.getPhaseOneLoopReadTimeoutInSeconds(), 60); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java index 54563431052eb..d1ff46cbc02d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java @@ -74,7 +74,7 @@ protected long compact(String topic, CryptoKeyReader cryptoKeyReader) } @Override - protected TwoPhaseCompactor getCompactor() { + protected PublishingOrderCompactor getCompactor() { return compactor; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index 2aa09309d3931..9f33479ce4cab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -53,7 +53,7 @@ public class TopicCompactionServiceTest extends MockedPulsarServiceBaseTest { protected ScheduledExecutorService compactionScheduler; protected BookKeeper bk; - private TwoPhaseCompactor compactor; + private PublishingOrderCompactor compactor; @BeforeMethod @Override @@ -73,7 +73,7 @@ public void setup() throws Exception { new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); bk = pulsar.getBookKeeperClientFactory().create( this.conf, null, null, Optional.empty(), null).get(); - compactor = new TwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler); + compactor = new PublishingOrderCompactor(conf, pulsarClient, bk, compactionScheduler); } @AfterMethod(alwaysRun = true) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index ab4925bfeb8de..74c2a93b84e9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -62,7 +62,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.compaction.TwoPhaseCompactor; +import org.apache.pulsar.compaction.PublishingOrderCompactor; import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.FunctionCommon; @@ -259,7 +259,7 @@ public void testReadCompactedFunction() throws Exception { @Cleanup("shutdownNow") ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); - TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config, + PublishingOrderCompactor twoPhaseCompactor = new PublishingOrderCompactor(config, pulsarClient, pulsar.getBookKeeperClient(), compactionScheduler); twoPhaseCompactor.compact(sourceTopic).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 7edc87bb996d4..be2b377a9cff5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -49,7 +49,7 @@ import org.apache.pulsar.common.policies.data.SinkStatus; import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.compaction.TwoPhaseCompactor; +import org.apache.pulsar.compaction.PublishingOrderCompactor; import org.apache.pulsar.functions.LocalRunner; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.utils.FunctionCommon; @@ -107,7 +107,7 @@ public void testReadCompactedSink() throws Exception { @Cleanup("shutdownNow") ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); - TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config, + PublishingOrderCompactor twoPhaseCompactor = new PublishingOrderCompactor(config, pulsarClient, pulsar.getBookKeeperClient(), compactionScheduler); twoPhaseCompactor.compact(sourceTopic).get();