diff --git a/kafka/channel/pkg/dispatcher/dispatcher.go b/kafka/channel/pkg/dispatcher/dispatcher.go index c2ee34661d..561e91ae45 100644 --- a/kafka/channel/pkg/dispatcher/dispatcher.go +++ b/kafka/channel/pkg/dispatcher/dispatcher.go @@ -81,7 +81,9 @@ type consumerMessageHandler struct { func (c consumerMessageHandler) Handle(ctx context.Context, message *sarama.ConsumerMessage) (bool, error) { event := fromKafkaMessage(ctx, message) - return true, c.dispatcher.DispatchEventWithDelivery(ctx, *event, c.sub.SubscriberURI, c.sub.ReplyURI, &c.sub.Delivery) + err := c.dispatcher.DispatchEventWithDelivery(ctx, *event, c.sub.SubscriberURI, c.sub.ReplyURI, &c.sub.Delivery) + // NOTE: only return `true` here if DispatchEventWithDelivery actually delivered the message. + return err == nil, err } var _ kafka.KafkaConsumerHandler = (*consumerMessageHandler)(nil)