Skip to content

Commit

Permalink
[fix][broker] Override onMessagePublish in BrokerInterceptors (#3)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored Nov 4, 2023
1 parent 7647f4d commit 4895685
Showing 1 changed file with 11 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -133,6 +134,16 @@ public void producerCreated(ServerCnx cnx, Producer producer,
}
}

@Override
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload, PublishContext publishContext) {
if (interceptors == null || interceptors.isEmpty()) {
return;
}
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onMessagePublish(producer, headersAndPayload, publishContext);
}
}

@Override
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
long entryId, Topic.PublishContext publishContext) {
Expand Down

0 comments on commit 4895685

Please sign in to comment.