Skip to content

Commit

Permalink
Add Kafka TLS support to the producer.
Browse files Browse the repository at this point in the history
Also, enhance the input consumption when initializing the
Kafka producer.
  • Loading branch information
srikartati committed Apr 29, 2021
1 parent 461dfad commit 356c876
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pkg/producer/convertor/test/flowtype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func createMsgwithDataSet(t *testing.T, isV6 bool) *entities.Message {

func TestKafkaProducer_Publish(t *testing.T) {
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = producer.KafkaConfigVersion
kafkaConfig.Version = sarama.DefaultVersion
kafkaConfig.Producer.Return.Successes = true
kafkaConfig.Producer.Return.Errors = true

Expand All @@ -268,7 +268,7 @@ func TestKafkaProducer_Publish(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockProducer := saramamock.NewAsyncProducer(t, kafkaConfig)
kafkaProducer := producer.NewKafkaProducer(mockProducer, "test-flow-msgs", tt.protoSchema)
kafkaProducer := producer.NewKafkaProducer(mockProducer, false, "test-flow-msgs", tt.protoSchema)

mockProducer.ExpectInputAndSucceed()
mockProducer.ExpectInputAndSucceed()
Expand Down
92 changes: 78 additions & 14 deletions pkg/producer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,82 @@
package producer

import (
"crypto/tls"
"crypto/x509"
"encoding/binary"

"github.com/Shopify/sarama"
"google.golang.org/protobuf/proto"
"io/ioutil"
"k8s.io/klog"
"log"
"os"

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/producer/convertor"
"github.com/vmware/go-ipfix/pkg/producer/protobuf"
)

var (
KafkaConfigVersion sarama.KafkaVersion
)

type KafkaProducer struct {
producer sarama.AsyncProducer
enableLogSuccesses bool
topic string
protoSchemaConvertor convertor.IPFIXToKafkaConvertor
}

func NewKafkaProducer(asyncProducer sarama.AsyncProducer, topic string, schemaType string) *KafkaProducer {
type ProducerInput struct {
// KafkaBrokers is a string of addresses of Kafka broker systems
KafkaBrokers []string
KafkaVersion sarama.KafkaVersion
KafkaTopic string
KafkaProtoSchema string
KafkaTLSEnabled bool
KafkaCAFile string
KafkaTLSCertFile string
KafkaTLSKeyFile string
KafkaTLSSkipVerify bool
KafkaLogErrors bool
KafkaLogSuccesses bool
EnableSaramaDebugLog bool
}

func NewKafkaProducer(asyncProducer sarama.AsyncProducer, enableLogSuccesses bool, topic string, schemaType string) *KafkaProducer {
return &KafkaProducer{
producer: asyncProducer,
enableLogSuccesses: enableLogSuccesses,
topic: topic,
protoSchemaConvertor: convertor.ProtoSchemaConvertor[schemaType](),
}
}

// InitKafkaProducer with broker addresses and other Kafka config parameters.
func InitKafkaProducer(addrs []string, topic string, protoSchema string, logErrors bool) (*KafkaProducer, error) {
func InitKafkaProducer(input ProducerInput) (*KafkaProducer, error) {

This comment has been minimized.

Copy link
@shihhaoli

shihhaoli May 5, 2021

Just curious, do we need separate functions like NewKafkaProducer and InitKafkaProducer?
For example, in our code flow we could have places that only needs to call NewKafkaProducer() without calling InitKafkaProducer?
Otherwise, we can consider combining NewKafkaProducer and InitKafkaProducer as one function.
If needing them separately as public functions, we can also make InitKafkaProducer as a method of KafkaProducer,
such as
func (k *KafkaProducer) Initialize() error

kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = KafkaConfigVersion
kafkaConfig.Producer.Return.Successes = false
kafkaConfig.Producer.Return.Errors = logErrors
kafkaConfig.Version = input.KafkaVersion
kafkaConfig.Producer.Return.Successes = input.KafkaLogSuccesses
kafkaConfig.Producer.Return.Errors = input.KafkaLogErrors

asyncProducer, err := sarama.NewAsyncProducer(addrs, kafkaConfig)
// Initialize TLS certificates
if input.KafkaTLSEnabled {
if tlsConfig := setupTLSConfig(input.KafkaCAFile, input.KafkaTLSCertFile, input.KafkaTLSKeyFile, input.KafkaTLSSkipVerify); tlsConfig != nil {
kafkaConfig.Net.TLS.Config = tlsConfig
kafkaConfig.Net.TLS.Enable = true
if input.KafkaTLSSkipVerify {
klog.Info("kafka client TLS enabled (server certificate didn't validate)")
} else {
klog.Info("kafka client TLS enabled")
}
}
}
if input.EnableSaramaDebugLog {
sarama.Logger = log.New(os.Stderr, "[sarama] ", log.LstdFlags)
}
asyncProducer, err := sarama.NewAsyncProducer(input.KafkaBrokers, kafkaConfig)
if err != nil {
return nil, err
}
producer := NewKafkaProducer(asyncProducer, topic, protoSchema)

producer := NewKafkaProducer(asyncProducer, input.KafkaLogSuccesses, input.KafkaTopic, input.KafkaProtoSchema)
// Capturing errors from Kafka sarama client
if logErrors {
if input.KafkaLogErrors {
go func() {
for msg := range asyncProducer.Errors() {
klog.Error(msg)
Expand All @@ -69,6 +101,33 @@ func InitKafkaProducer(addrs []string, topic string, protoSchema string, logErro
return producer, nil
}

func setupTLSConfig(caFile, tlsCertFile, tlsKeyFile string, tlsSkipVerify bool) *tls.Config {
var t *tls.Config

if tlsCertFile != "" || tlsKeyFile != "" || caFile != "" {
cert, err := tls.LoadX509KeyPair(tlsCertFile, tlsKeyFile)
if err != nil {
klog.Fatalf("kafka TLS load X509 key pair error: %v", err)
}

caCert, err := ioutil.ReadFile(caFile)
if err != nil {
klog.Fatalf("kafka TLS CA file error: %v", err)
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

t = &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: tlsSkipVerify,
}
}

return t
}

// SendFlowMessage takes in the flow message in proto schema, encodes it and sends
// it to on the producer channel. If kafkaDelimitMsgWithLen is set to true, it will
// return a length-prefixed encoded message.
Expand All @@ -88,6 +147,11 @@ func (kp *KafkaProducer) SendFlowMessage(msg *protobuf.FlowMessage, kafkaDelimit
Topic: kp.topic,
Value: sarama.ByteEncoder(bytes),
}
if kp.enableLogSuccesses {
kafkaMsg := <-kp.producer.Successes()
klog.V(2).Infof("Sent the message successfully: %v", kafkaMsg)
}

}

// Publish takes in a message channel as input and converts all the messages on
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/collector_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ func TestCollectorToProducer(t *testing.T) {
cp, _ := collector.InitCollectingProcess(cpInput)
// Create a mock Kafka producer
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = producer.KafkaConfigVersion
kafkaConfig.Version = sarama.DefaultVersion
kafkaConfig.Producer.Return.Successes = true
kafkaConfig.Producer.Return.Errors = true

mockProducer := saramamock.NewAsyncProducer(t, kafkaConfig)
kafkaProducer := producer.NewKafkaProducer(mockProducer, "test-flow-msgs", convertortest.FlowType1)
kafkaProducer := producer.NewKafkaProducer(mockProducer, false, "test-flow-msgs", convertortest.FlowType1)

go cp.Start()
waitForCollectorReady(t, cp)
Expand Down

0 comments on commit 356c876

Please sign in to comment.