Skip to content

Commit

Permalink
[feat] PIP-352: Event time based compaction (apache#22517)
Browse files Browse the repository at this point in the history
  • Loading branch information
marekczajkowski committed Aug 23, 2024
1 parent 44f9860 commit 1c495e1
Show file tree
Hide file tree
Showing 20 changed files with 1,037 additions and 500 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,12 @@ brokerServiceCompactionPhaseOneLoopTimeInSeconds=30
# Whether retain null-key message during topic compaction
topicCompactionRetainNullKey=false

# Class name of the factory that implements the topic compaction service.
# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory",
# will create topic compaction service based on message eventTime.
# By default compaction service is based on message publishing order.
compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1318,3 +1318,9 @@ disableBrokerInterceptors=true

# Whether retain null-key message during topic compaction
topicCompactionRetainNullKey=false

# Class name of the factory that implements the topic compaction service.
# If value is "org.apache.pulsar.compaction.EventTimeCompactionServiceFactory",
# will create topic compaction service based on message eventTime.
# By default compaction service is based on message publishing order.
compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.compaction.MessageCompactionData;

public class RawBatchConverter {

Expand All @@ -51,8 +52,8 @@ public static boolean isReadableBatch(MessageMetadata metadata) {
return metadata.hasNumMessagesInBatch() && metadata.getEncryptionKeysCount() == 0;
}

public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSize(RawMessage msg)
throws IOException {
public static List<MessageCompactionData> extractMessageCompactionData(RawMessage msg)
throws IOException {
checkArgument(msg.getMessageIdData().getBatchIndex() == -1);

ByteBuf payload = msg.getHeadersAndPayload();
Expand All @@ -64,25 +65,35 @@ public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKey
int uncompressedSize = metadata.getUncompressedSize();
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);

List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = new ArrayList<>();
List<MessageCompactionData> messageCompactionDataList = new ArrayList<>();

SingleMessageMetadata smm = new SingleMessageMetadata();
for (int i = 0; i < batchSize; i++) {
ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
smm,
0, batchSize);
smm,
0, batchSize);
MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
msg.getMessageIdData().getEntryId(),
msg.getMessageIdData().getPartition(),
i);
if (!smm.isCompactedOut()) {
idsAndKeysAndSize.add(ImmutableTriple.of(id,
smm.hasPartitionKey() ? smm.getPartitionKey() : null,
smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
messageCompactionDataList.add(new MessageCompactionData(id,
smm.hasPartitionKey() ? smm.getPartitionKey() : null,
smm.hasPayloadSize() ? smm.getPayloadSize() : 0, smm.getEventTime()));
}
singleMessagePayload.release();
}
uncompressedPayload.release();
return messageCompactionDataList;
}

public static List<ImmutableTriple<MessageId, String, Integer>> extractIdsAndKeysAndSize(
RawMessage msg)
throws IOException {
List<ImmutableTriple<MessageId, String, Integer>> idsAndKeysAndSize = new ArrayList<>();
for (MessageCompactionData mcd : extractMessageCompactionData(msg)) {
idsAndKeysAndSize.add(ImmutableTriple.of(mcd.messageId(), mcd.key(), mcd.payloadSize()));
}
return idsAndKeysAndSize;
}

Expand Down
Loading

0 comments on commit 1c495e1

Please sign in to comment.