Skip to content

Commit

Permalink
feat: Add PubSub Kafka client
Browse files Browse the repository at this point in the history
  • Loading branch information
Neurostep committed Apr 16, 2024
1 parent 968ff20 commit eba9a3f
Show file tree
Hide file tree
Showing 6 changed files with 932 additions and 0 deletions.
154 changes: 154 additions & 0 deletions pkg/pubsub/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package kafka

import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/twmb/franz-go/pkg/kgo"
awssasl "github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"

sdklogger "github.com/scribd/go-sdk/pkg/logger"
"github.com/scribd/go-sdk/pkg/pubsub"
)

// Config provides a common configuration for Kafka PubSub clients.
type Config struct {
// Application name that will be used in a serviceName provided to tracer spans
ApplicationName string
// Kafka configuration provided by go-sdk
KafkaConfig pubsub.Kafka
// AWS session reference, it will be used in case AWS MSK IAM authentication mechanism is used
AwsSession *session.Session
// MsgHandler is a function that will be called when a message is received
MsgHandler MsgHandler
Logger sdklogger.Logger
}

const tlsConnectionTimeout = 10 * time.Second

func newConfig(c Config, opts ...kgo.Opt) ([]kgo.Opt, error) {
options := []kgo.Opt{
kgo.SeedBrokers(c.KafkaConfig.BrokerUrls...),
kgo.ClientID(c.KafkaConfig.ClientId),
}

if c.KafkaConfig.SASL.Enabled {
switch c.KafkaConfig.SASLMechanism() {
case pubsub.Plain:
options = append(options, getPlainSaslOption(c.KafkaConfig.SASL))
case pubsub.AWSMskIam:
options = append(options, getAwsMskIamSaslOption(c.KafkaConfig.SASL.AWSMskIam, c.AwsSession))
}
}

if c.KafkaConfig.TLS.Enabled || c.KafkaConfig.SecurityProtocol == "ssl" {
var caCertPool *x509.CertPool

if c.KafkaConfig.TLS.Ca != "" {
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(c.KafkaConfig.TLS.Ca))
}

var certificates []tls.Certificate
if c.KafkaConfig.TLS.Cert != "" && c.KafkaConfig.TLS.CertKey != "" {
cert, err := tls.X509KeyPair([]byte(c.KafkaConfig.TLS.Cert), []byte(c.KafkaConfig.TLS.CertKey))
if err != nil {
return nil, err
}
certificates = []tls.Certificate{cert}
}

if c.KafkaConfig.Cert != "" && c.KafkaConfig.CertKey != "" {
cert, err := tls.X509KeyPair([]byte(c.KafkaConfig.Cert), []byte(c.KafkaConfig.CertKey))
if err != nil {
return nil, err
}
certificates = []tls.Certificate{cert}
}

var skipTLSVerify bool
if c.KafkaConfig.TLS.InsecureSkipVerify || !c.KafkaConfig.SSLVerificationEnabled {
skipTLSVerify = true
}

tlsDialer := &tls.Dialer{
NetDialer: &net.Dialer{Timeout: tlsConnectionTimeout},
Config: &tls.Config{
InsecureSkipVerify: skipTLSVerify,
Certificates: certificates,
RootCAs: caCertPool,
},
}

options = append(options, kgo.Dialer(tlsDialer.DialContext))
}

options = append(options, opts...)

return options, nil
}

func getPlainSaslOption(saslConf pubsub.SASL) kgo.Opt {
return kgo.SASL(plain.Auth{
User: saslConf.Username,
Pass: saslConf.Password,
}.AsMechanism())
}

func getAwsMskIamSaslOption(iamConf pubsub.SASLAwsMskIam, s *session.Session) kgo.Opt {
var opt kgo.Opt

// no AWS session provided
if s == nil {
opt = kgo.SASL(awssasl.Auth{
AccessKey: iamConf.AccessKey,
SecretKey: iamConf.SecretKey,
SessionToken: iamConf.SessionToken,
UserAgent: iamConf.UserAgent,
}.AsManagedStreamingIAMMechanism())
} else {
opt = kgo.SASL(
awssasl.ManagedStreamingIAM(func(ctx context.Context) (awssasl.Auth, error) {
// If assumable role is not provided, we try to get credentials from the provided AWS session
if iamConf.AssumableRole == "" {
val, err := s.Config.Credentials.Get()
if err != nil {
return awssasl.Auth{}, err
}

return awssasl.Auth{
AccessKey: val.AccessKeyID,
SecretKey: val.SecretAccessKey,
SessionToken: val.SessionToken,
UserAgent: iamConf.UserAgent,
}, nil
}

svc := sts.New(s)

res, stsErr := svc.AssumeRole(&sts.AssumeRoleInput{
RoleArn: &iamConf.AssumableRole,
RoleSessionName: &iamConf.SessionName,
})
if stsErr != nil {
return awssasl.Auth{}, stsErr
}

return awssasl.Auth{
AccessKey: *res.Credentials.AccessKeyId,
SecretKey: *res.Credentials.SecretAccessKey,
SessionToken: *res.Credentials.SessionToken,
UserAgent: iamConf.UserAgent,
}, nil
}),
)
}

return opt
}
42 changes: 42 additions & 0 deletions pkg/pubsub/kafka/partitionconsumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package kafka

import (
"context"

"github.com/twmb/franz-go/pkg/kgo"

sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
sdklogger "github.com/scribd/go-sdk/pkg/logger"
)

type pconsumer struct {
pool *pool

quit chan struct{}
done chan struct{}
recs chan *sdkkafka.FetchPartition
}

func (pc *pconsumer) consume(cl *kgo.Client, logger sdklogger.Logger, shouldCommit bool, handler func(*kgo.Record)) {
defer close(pc.done)

for {
select {
case <-pc.quit:
return
case p := <-pc.recs:
p.EachRecord(func(rec *kgo.Record) {
pc.pool.Schedule(func() {
defer p.ConsumeRecord(rec)

handler(rec)
})
})
if shouldCommit {
if err := cl.CommitRecords(context.Background(), p.Records...); err != nil {
logger.WithError(err).Errorf("Partition consumer failed to commit records")
}
}
}
}
}
33 changes: 33 additions & 0 deletions pkg/pubsub/kafka/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package kafka

type pool struct {
sem chan struct{}
work chan func()
}

func newPool(size int) *pool {
p := &pool{
sem: make(chan struct{}, size),
work: make(chan func()),
}

return p
}

func (p *pool) Schedule(task func()) {
select {
case p.work <- task:
return
case p.sem <- struct{}{}:
go p.worker(task)
}
}

func (p *pool) worker(task func()) {
defer func() { <-p.sem }()

for {
task()
task = <-p.work
}
}
85 changes: 85 additions & 0 deletions pkg/pubsub/kafka/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package kafka

import (
"context"
"fmt"
"time"

"github.com/twmb/franz-go/pkg/kgo"

sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
)

type (
Publisher struct {
producer *sdkkafka.Client
}
)

const (
defaultFlushTimeout = time.Second * 10

publisherServiceNameSuffix = "pubsub-publisher"
)

// NewPublisher is a tiny wrapper around the go-sdk kafka.Client and provides API to Publish kafka messages.
func NewPublisher(c Config, opts ...kgo.Opt) (*Publisher, error) {
serviceName := fmt.Sprintf("%s-%s", c.ApplicationName, publisherServiceNameSuffix)

cfg, err := newConfig(c, opts...)
if err != nil {
return nil, err
}

cfg = append(cfg, []kgo.Opt{
kgo.ProduceRequestTimeout(c.KafkaConfig.Publisher.WriteTimeout),
kgo.RecordRetries(c.KafkaConfig.Publisher.MaxAttempts),
}...)

producer, err := sdkkafka.NewClient(cfg, sdkkafka.WithServiceName(serviceName))
if err != nil {
return nil, err
}

return &Publisher{producer: producer}, nil
}

// Publish publishes kgo.Record message.
func (p *Publisher) Publish(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error)) {
p.producer.Produce(ctx, rec, fn)
}

// Produce is an alias to Publish to satisfy kafka go-kit transport.
func (p *Publisher) Produce(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error)) {
p.Publish(ctx, rec, fn)
}

// ProduceSync publishes kgo.Record messages synchronously.
func (p *Publisher) ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults {
return p.producer.ProduceSync(ctx, rs...)
}

// GetKafkaProducer returns underlying kafka.Producer for fine-grained tuning purposes.
func (p *Publisher) GetKafkaProducer() *sdkkafka.Client {
return p.producer
}

// Stop flushes and waits for outstanding messages and requests to complete delivery.
// It also closes a Producer instance.
func (p *Publisher) Stop(ctx context.Context) error {
if _, deadlineSet := ctx.Deadline(); !deadlineSet {
timeoutCtx, cancel := context.WithTimeout(ctx, defaultFlushTimeout)
defer cancel()

ctx = timeoutCtx
}

err := p.producer.Flush(ctx)
if err != nil {
return err
}

p.producer.Close()

return nil
}
Loading

0 comments on commit eba9a3f

Please sign in to comment.