Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Kafka and Natss Channels Redelivery #1114

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 10 additions & 31 deletions kafka/channel/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/v2/binding"
protocolkafka "github.com/cloudevents/sdk-go/v2/protocol/kafka_sarama"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
Expand All @@ -40,8 +39,6 @@ import (
)

type KafkaDispatcher struct {
// TODO: config doesn't have to be atomic as it is read and updated using updateLock.
config atomic.Value
hostToChannelMap atomic.Value
// hostToChannelMapLock is used to update hostToChannelMap
hostToChannelMapLock sync.Mutex
Expand Down Expand Up @@ -78,20 +75,17 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
return nil, fmt.Errorf("unable to create kafka producer: %v", err)
}

messageSender, err := kncloudevents.NewHttpMessageSender(args.KnCEConnectionArgs, "")
if err != nil {
args.Logger.Fatal("failed to create message sender", zap.Error(err))
}

dispatcher := &KafkaDispatcher{
dispatcher: eventingchannels.NewMessageDispatcher(args.Logger),
dispatcher: eventingchannels.NewMessageDispatcherFromConfig(
args.Logger,
eventingchannels.EventDispatcherConfig{ConnectionArgs: *args.KnCEConnectionArgs},
),
kafkaConsumerFactory: kafka.NewConsumerGroupFactory(client),
channelSubscriptions: make(map[eventingchannels.ChannelReference][]types.UID),
subsConsumerGroups: make(map[types.UID]sarama.ConsumerGroup),
subscriptions: make(map[types.UID]subscription),
kafkaAsyncProducer: producer,
logger: args.Logger,
messageSender: messageSender,
topicFunc: args.TopicFunc,
}
receiverFunc, err := eventingchannels.NewMessageReceiver(
Expand All @@ -116,7 +110,6 @@ func NewDispatcher(ctx context.Context, args *KafkaDispatcherArgs) (*KafkaDispat
}

dispatcher.receiver = receiverFunc
dispatcher.setConfig(&multichannelfanout.Config{})
dispatcher.setHostToChannelMap(map[string]eventingchannels.ChannelReference{})
return dispatcher, nil
}
Expand Down Expand Up @@ -167,7 +160,7 @@ func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sar
zap.String("topic", consumerMessage.Topic),
zap.String("sub", string(c.sub.UID)),
)
err := c.dispatcher.DispatchMessage(context.Background(), message, nil, destination, reply, deadLetter)
err := c.dispatcher.DispatchMessage(ctx, message, nil, destination, reply, deadLetter)
// NOTE: only return `true` here if DispatchMessage actually delivered the message.
return err == nil, err
}
Expand All @@ -180,15 +173,8 @@ type subscription struct {
Name string
}

// configDiff diffs the new config with the existing config. If there are no differences, then the
// empty string is returned. If there are differences, then a non-empty string is returned
// describing the differences.
func (d *KafkaDispatcher) configDiff(updated *multichannelfanout.Config) string {
return cmp.Diff(d.getConfig(), updated)
}

// UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller.
func (d *KafkaDispatcher) UpdateKafkaConsumers(config *multichannelfanout.Config) (map[eventingduck.SubscriberSpec]error, error) {
func (d *KafkaDispatcher) UpdateKafkaConsumers(ctx context.Context, config *multichannelfanout.Config) (map[eventingduck.SubscriberSpec]error, error) {
if config == nil {
return nil, fmt.Errorf("nil config")
}
Expand Down Expand Up @@ -218,7 +204,7 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *multichannelfanout.Config
if !exists {
// only subscribe when not exists in channel-subscriptions map
// do not need to resubscribe every time channel fanout config is updated
if err := d.subscribe(channelRef, sub); err != nil {
if err := d.subscribe(ctx, channelRef, sub); err != nil {
failedToSubscribe[subSpec] = err
}
}
Expand Down Expand Up @@ -312,15 +298,15 @@ func (d *KafkaDispatcher) Start(ctx context.Context) error {

// subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine.
// subscribe must be called under updateLock.
func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub subscription) error {
func (d *KafkaDispatcher) subscribe(ctx context.Context, channelRef eventingchannels.ChannelReference, sub subscription) error {
d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))

topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name)
groupID := fmt.Sprintf("kafka.%s.%s.%s", sub.Namespace, channelRef.Name, sub.Name)

handler := &consumerMessageHandler{d.logger, sub, d.dispatcher}

consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler)
consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroupWithContext(ctx, groupID, []string{topicName}, d.logger, handler)

if err != nil {
// we can not create a consumer - logging that, with reason
Expand All @@ -332,7 +318,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference
// this goroutine logs errors incoming
go func() {
for err = range consumerGroup.Errors() {
panic(err)
d.logger.Error("Consumer group", zap.String("id", groupID), zap.Error(err))
}
}()

Expand Down Expand Up @@ -363,13 +349,6 @@ func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference,
}
return nil
}
func (d *KafkaDispatcher) getConfig() *multichannelfanout.Config {
return d.config.Load().(*multichannelfanout.Config)
}

func (d *KafkaDispatcher) setConfig(config *multichannelfanout.Config) {
d.config.Store(config)
}

func (d *KafkaDispatcher) getHostToChannelMap() map[string]eventingchannels.ChannelReference {
return d.hostToChannelMap.Load().(map[string]eventingchannels.ChannelReference)
Expand Down
2 changes: 1 addition & 1 deletion kafka/channel/pkg/dispatcher/dispatcher_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestDispatcher(t *testing.T) {
t.Fatal(err)
}

failed, err := dispatcher.UpdateKafkaConsumers(&config)
failed, err := dispatcher.UpdateKafkaConsumers(context.TODO(), &config)
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 6 additions & 3 deletions kafka/channel/pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ func (c mockKafkaConsumerFactory) StartConsumerGroup(groupID string, topics []st
return mockConsumerGroup{}, nil
}

func (c mockKafkaConsumerFactory) StartConsumerGroupWithContext(ctx context.Context, groupID string, topics []string, logger *zap.Logger, handler kafka.KafkaConsumerHandler) (sarama.ConsumerGroup, error) {
return c.StartConsumerGroup(groupID, topics, logger, handler)
}

var _ kafka.KafkaConsumerGroupFactory = (*mockKafkaConsumerFactory)(nil)

type mockConsumerGroup struct{}
Expand All @@ -85,7 +89,7 @@ func (d *KafkaDispatcher) checkConfigAndUpdate(config *multichannelfanout.Config
return errors.New("nil config")
}

if _, err := d.UpdateKafkaConsumers(config); err != nil {
if _, err := d.UpdateKafkaConsumers(context.TODO(), config); err != nil {
// failed to update dispatchers consumers
return err
}
Expand Down Expand Up @@ -317,7 +321,6 @@ func TestDispatcher_UpdateConfig(t *testing.T) {
topicFunc: utils.TopicName,
logger: zaptest.NewLogger(t),
}
d.setConfig(&multichannelfanout.Config{})
d.setHostToChannelMap(map[string]eventingchannels.ChannelReference{})

// Initialize using oldConfig
Expand Down Expand Up @@ -403,7 +406,7 @@ func TestSubscribeError(t *testing.T) {
UID: "test-sub",
},
}
err := d.subscribe(channelRef, subRef)
err := d.subscribe(context.TODO(), channelRef, subRef)
if err == nil {
t.Errorf("Expected error want %s, got %s", "error creating consumer", err)
}
Expand Down
20 changes: 10 additions & 10 deletions kafka/channel/pkg/reconciler/dispatcher/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,13 @@ func (r *Reconciler) reconcile(ctx context.Context, kc *v1alpha1.KafkaChannel) e
kafkaChannels = append(kafkaChannels, channel)
}
}
config := r.newConfigFromKafkaChannels(kafkaChannels)
config := r.newConfigFromKafkaChannels(ctx, kafkaChannels)
if err := r.kafkaDispatcher.UpdateHostToChannelMap(config); err != nil {
logging.FromContext(ctx).Error("Error updating host to channel map in dispatcher")
return err
}

failedSubscriptionsv1beta1, err := r.kafkaDispatcher.UpdateKafkaConsumers(config)
failedSubscriptionsv1beta1, err := r.kafkaDispatcher.UpdateKafkaConsumers(ctx, config)
if err != nil {
logging.FromContext(ctx).Error("Error updating kafka consumers in dispatcher")
return err
Expand Down Expand Up @@ -296,7 +296,7 @@ func (r *Reconciler) createSubscribableStatus(subscribable *eventingduckv1alpha1
}

// newConfigFromKafkaChannels creates a new Config from the list of kafka channels.
func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel) *multichannelfanout.ChannelConfig {
func (r *Reconciler) newChannelConfigFromKafkaChannel(ctx context.Context, c *v1alpha1.KafkaChannel) *multichannelfanout.ChannelConfig {
channelConfig := multichannelfanout.ChannelConfig{
Namespace: c.Namespace,
Name: c.Name,
Expand All @@ -305,9 +305,10 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel)
if c.Spec.Subscribable != nil {
newSubs := make([]eventingduckv1beta1.SubscriberSpec, len(c.Spec.Subscribable.Subscribers))
for i, source := range c.Spec.Subscribable.Subscribers {
source.ConvertTo(context.TODO(), &newSubs[i])
fmt.Printf("Converted \n%+v\n To\n%+v\n", source, newSubs[i])
fmt.Printf("Delivery converted \n%+v\nto\n%+v\n", source.Delivery, newSubs[i].Delivery)
source.ConvertTo(ctx, &newSubs[i])
logger := logging.FromContext(ctx).Sugar()
logger.Debugf("Converted \n%+v\n To\n%+v\n", source, newSubs[i])
logger.Debugf("Delivery converted \n%+v\nto\n%+v\n", source.Delivery, newSubs[i].Delivery)
}
channelConfig.FanoutConfig = fanout.Config{
AsyncHandler: true,
Expand All @@ -318,10 +319,10 @@ func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel)
}

// newConfigFromKafkaChannels creates a new Config from the list of kafka channels.
func (r *Reconciler) newConfigFromKafkaChannels(channels []*v1alpha1.KafkaChannel) *multichannelfanout.Config {
func (r *Reconciler) newConfigFromKafkaChannels(ctx context.Context, channels []*v1alpha1.KafkaChannel) *multichannelfanout.Config {
cc := make([]multichannelfanout.ChannelConfig, 0)
for _, c := range channels {
channelConfig := r.newChannelConfigFromKafkaChannel(c)
channelConfig := r.newChannelConfigFromKafkaChannel(ctx, c)
cc = append(cc, *channelConfig)
}
return &multichannelfanout.Config{
Expand All @@ -342,6 +343,5 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.KafkaCh
existing := kc.DeepCopy()
existing.Status = desired.Status

new, err := r.kafkaClientSet.MessagingV1alpha1().KafkaChannels(desired.Namespace).UpdateStatus(existing)
return new, err
return r.kafkaClientSet.MessagingV1alpha1().KafkaChannels(desired.Namespace).UpdateStatus(existing)
}
14 changes: 13 additions & 1 deletion kafka/common/pkg/kafka/consumer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var newConsumerGroupFromClient = sarama.NewConsumerGroupFromClient
// Kafka consumer factory creates the ConsumerGroup and start consuming the specified topic
type KafkaConsumerGroupFactory interface {
StartConsumerGroup(groupID string, topics []string, logger *zap.Logger, handler KafkaConsumerHandler) (sarama.ConsumerGroup, error)
StartConsumerGroupWithContext(ctx context.Context, groupID string, topics []string, logger *zap.Logger, handler KafkaConsumerHandler) (sarama.ConsumerGroup, error)
}

type kafkaConsumerGroupFactoryImpl struct {
Expand Down Expand Up @@ -68,6 +69,17 @@ func (c customConsumerGroup) Errors() <-chan error {
var _ sarama.ConsumerGroup = (*customConsumerGroup)(nil)

func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics []string, logger *zap.Logger, handler KafkaConsumerHandler) (sarama.ConsumerGroup, error) {
return c.StartConsumerGroupWithContext(context.Background(), groupID, topics, logger, handler)
}

func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroupWithContext(
ctx context.Context,
groupID string,
topics []string,
logger *zap.Logger,
handler KafkaConsumerHandler,
) (sarama.ConsumerGroup, error) {

consumerGroup, err := newConsumerGroupFromClient(groupID, c.client)

if err != nil {
Expand All @@ -77,7 +89,7 @@ func (c kafkaConsumerGroupFactoryImpl) StartConsumerGroup(groupID string, topics
consumerHandler := NewConsumerHandler(logger, handler)

go func() {
err2 := consumerGroup.Consume(context.TODO(), topics, &consumerHandler)
err2 := consumerGroup.Consume(ctx, topics, &consumerHandler)
if err2 != nil {
consumerHandler.errors <- err2
}
Expand Down
66 changes: 51 additions & 15 deletions kafka/common/pkg/kafka/consumer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,55 @@ package kafka

import (
"context"
"errors"
"fmt"
"time"

"github.com/Shopify/sarama"
"k8s.io/apimachinery/pkg/util/wait"

"go.uber.org/zap"
)

var (
PermanentError = errors.New("unrecoverable error")

// TODO make configurable
defaultBackoff = wait.Backoff{
Steps: 20,
Duration: 10 * time.Millisecond,
Factor: 2.0,
Jitter: 0.1,
}
)

type KafkaConsumerHandler interface {
// When this function returns true, the consumer group offset is committed.
// The returned error is enqueued in errors channel.
Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error)
}

// ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling
// You must implement KafkaConsumerHandler and create a new saramaConsumerHandler with it
// saramaConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling
type saramaConsumerHandler struct {
// The user message handler
handler KafkaConsumerHandler

backoff wait.Backoff

logger *zap.Logger
// Errors channel
errors chan error
}

func NewConsumerHandler(logger *zap.Logger, handler KafkaConsumerHandler) saramaConsumerHandler {
return NewConsumerHandlerWithBackoff(logger, handler, defaultBackoff)
}

func NewConsumerHandlerWithBackoff(logger *zap.Logger, handler KafkaConsumerHandler, backoff wait.Backoff) saramaConsumerHandler {
return saramaConsumerHandler{
logger: logger,
handler: handler,
backoff: backoff,
logger: logger,
errors: make(chan error, 10), // Some buffering to avoid blocking the message processing
}
}
Expand All @@ -63,33 +85,47 @@ func (consumer *saramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSessi
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *saramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
consumer.logger.Info(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset()))
defer consumer.logger.Info(fmt.Sprintf("Stopping partition consumer, topic: %s, partition: %d", claim.Topic(), claim.Partition()))

// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
// NOTE: Do not move the code below to a goroutine. The `ConsumeClaim` itself is called within a goroutine.
for message := range claim.Messages() {
if ce := consumer.logger.Check(zap.DebugLevel, "debugging"); ce != nil {
consumer.logger.Debug("Message claimed", zap.String("topic", message.Topic), zap.Binary("value", message.Value))
consumer.logger.Debug("message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("Offset", message.Offset))
}

mustMark, err := consumer.handler.Handle(session.Context(), message)

err := consumer.handleWithExponentialBackOffRetries(session, message)
if err != nil {
consumer.logger.Info("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err))
consumer.errors <- err
consumer.errors <- PermanentError
}
if mustMark {
}

return nil
}

func (consumer *saramaConsumerHandler) handleWithExponentialBackOffRetries(
session sarama.ConsumerGroupSession,
message *sarama.ConsumerMessage,
) error {
return wait.ExponentialBackoff(consumer.backoff, func() (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we want to rely on the CloudEvent SDK to do the retry. Also look at the delivery spec for additional strategies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not here, because we use bindings apis so we don't rely on client


shouldMark, err := consumer.handler.Handle(session.Context(), message)

if shouldMark {
session.MarkMessage(message, "") // Mark kafka message as processed
if ce := consumer.logger.Check(zap.DebugLevel, "debugging"); ce != nil {
consumer.logger.Debug("Message marked", zap.String("topic", message.Topic), zap.Binary("value", message.Value))
}
}

}
if err != nil {
consumer.logger.Info("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err))
consumer.errors <- err
return false, nil
}

consumer.logger.Info(fmt.Sprintf("Stopping partition consumer, topic: %s, partition: %d", claim.Topic(), claim.Partition()))
return nil
return true, nil
})
}

var _ sarama.ConsumerGroupHandler = (*saramaConsumerHandler)(nil)
Loading