Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Dec 19, 2023
1 parent 9b2709e commit 56ba96a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,16 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
}

if (msgMetadata == null || (Markers.isServerOnlyMarker(msgMetadata))) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}

// Deliver marker to __compaction cursor to avoid compaction task stuck,
// and filter out them when doing topic compaction.
if (cursor == null || !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
}

entries.set(i, null);
entry.release();
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
Optional<RawMessage> messageToAdd = Optional.empty();
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (Markers.isTxnMarker(metadata)) {
if (Markers.isServerOnlyMarker(metadata)) {
messageToAdd = Optional.empty();
} else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
Expand Down

0 comments on commit 56ba96a

Please sign in to comment.