Skip to content

Commit

Permalink
[PIP 70][Issue 8617] Introduce lightweight broker entry metadata (#8618)
Browse files Browse the repository at this point in the history
Fixes #8617

## Motivation
Introduce lightweight raw Message metadata, details can be found PIP-70

## Modifications
wire protocol add RawMessageMetadata and supports_raw_message_meta for FeatureFlags
change how produced message is saved in bookkeeper: add raw metadata for message
change how message is seek-by-time
change how message send back to Consumer: skip metadata if consumer not supprot raw metadata
Verifying this change
Added tests for parse/skip raw message metadata
Added test for how message seek-by broker timestamp for message
  • Loading branch information
aloyszhang authored Dec 12, 2020
1 parent b63e288 commit 6275297
Show file tree
Hide file tree
Showing 27 changed files with 1,261 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,5 @@ public int getNewEntriesCheckDelayInMillis() {
public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.BKException;
Expand Down Expand Up @@ -103,6 +105,7 @@ public void setCloseWhenDone(boolean closeWhenDone) {

public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {

ByteBuf duplicateBuffer = data.retainedDuplicate();

// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
Expand All @@ -117,7 +120,7 @@ public void initiate() {
public void failed(ManagedLedgerException e) {
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
data.release();
ReferenceCountUtil.release(data);
cb.addFailed(e, ctx);
ml.mbean.recordAddEntryError();
}
Expand Down Expand Up @@ -176,7 +179,7 @@ public void safeRun() {
}

// We are done using the byte buffer
data.release();
ReferenceCountUtil.release(data);

PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.util.internal.PlatformDependent;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -865,6 +866,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
"please enable the system topic first.")
private boolean topicLevelPoliciesEnabled = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "List of interceptors for entry metadata.")
private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();

/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
Expand Down Expand Up @@ -251,6 +253,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private volatile boolean reachMessagePublishBufferThreshold;
private BrokerInterceptor interceptor;

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

public BrokerService(PulsarService pulsar) throws Exception {
this.pulsar = pulsar;
this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0
Expand Down Expand Up @@ -358,6 +362,10 @@ public Map<String, String> deserialize(String key, byte[] content) throws Except
.supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits())
.register();

this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils
.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(),
BrokerService.class.getClassLoader());
}

// This call is used for starting additional protocol handlers
Expand Down Expand Up @@ -2499,4 +2507,13 @@ private void checkTopicLevelPolicyEnable() {
public void setInterceptor(BrokerInterceptor interceptor) {
this.interceptor = interceptor;
}

public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
return brokerEntryMetadataInterceptors;
}

public boolean isBrokerEntryMetadataEnabled() {
return brokerEntryMetadataInterceptors.size() > 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ public TopicClosedException(Throwable t) {
}
}

public static class AddEntryMetadataException extends BrokerServiceException {
public AddEntryMetadataException(Throwable t) {
super(t);
}
}

public static class PersistenceException extends BrokerServiceException {
public PersistenceException(Throwable t) {
super(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName,
// increment ref-count of data and release at the end of process:
// so, we can get chance to call entry.release
metadataAndPayload.retain();
// skip raw message metadata since broker timestamp only used in broker side
Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
Commands.skipChecksumIfPresent(metadataAndPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,11 @@ boolean supportsAuthenticationRefresh() {
return features != null && features.getSupportsAuthRefresh();
}


boolean supportBrokerMetadata() {
return features != null && features.getSupportsBrokerEntryMetadata();
}

@Override
public String getClientVersion() {
return clientVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void sendMessage(Entry entry) {
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
msg = MessageImpl.deserialize(headersAndPayload);
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ public void expireMessages(int messageTTLInSeconds) {
messageTTLInSeconds);

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, entry -> {
MessageImpl<?> msg = null;
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserialize(entry.getDataBuffer());
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.isExpired(messageTTLInSeconds);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for expiry check", topicName, subName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}

cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
MessageImpl msg = null;
MessageImpl<byte[]> msg = null;
try {
msg = MessageImpl.deserialize(entry.getDataBuffer());
return msg.getPublishTime() < timestamp;
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
return msg.publishedEarlierThan(timestamp);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for message position find", topicName, subName, e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
ByteBuf headersAndPayload = entry.getDataBuffer();
MessageImpl msg;
try {
msg = MessageImpl.deserialize(headersAndPayload);
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status) {
case NotDup:
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
// intercept headersAndPayload and add entry metadata
if (appendBrokerEntryMetadata(headersAndPayload, publishContext)) {
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
}
break;
case Dup:
// Immediately acknowledge duplicated message
Expand All @@ -371,6 +374,24 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
}
}

private boolean appendBrokerEntryMetadata(ByteBuf headersAndPayload, PublishContext publishContext) {
// just return true if BrokerEntryMetadata is not enabled
if (!brokerService.isBrokerEntryMetadataEnabled()) {
return true;
}

try {
headersAndPayload = Commands.addBrokerEntryMetadata(headersAndPayload,
brokerService.getBrokerEntryMetadataInterceptors());
} catch (Exception e) {
decrementPendingWriteOpsAndCheck();
publishContext.completed(new BrokerServiceException.AddEntryMetadataException(e), -1, -1);
log.error("[{}] Failed to add broker entry metadata.", topic, e);
return false;
}
return true;
}

public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
if (ledger instanceof ManagedLedgerImpl) {
((ManagedLedgerImpl) ledger).asyncReadEntry(position, callback, ctx);
Expand Down Expand Up @@ -2145,16 +2166,17 @@ public void terminateFailed(ManagedLedgerException exception, Object ctx) {
return future;
}

public boolean isOldestMessageExpired(ManagedCursor cursor, long messageTTLInSeconds) {
MessageImpl msg = null;
public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeconds) {
MessageImpl<byte[]> msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
msg = MessageImpl.deserialize(entry.getDataBuffer());
isOldestMessageExpired = messageTTLInSeconds != 0 && System.currentTimeMillis() > (msg.getPublishTime()
+ TimeUnit.SECONDS.toMillis((long) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD)));
msg = MessageImpl.deserializeBrokerEntryMetaDataFirst(entry.getDataBuffer());
if (messageTTLInSeconds != 0) {
isOldestMessageExpired = msg.isExpired((int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD));
}
}
} catch (Exception e) {
log.warn("[{}] Error while getting the oldest message", topic, e);
Expand Down
Loading

0 comments on commit 6275297

Please sign in to comment.