Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEF-3223] Add Kafka PubSub and Kafka transport clients #102

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/DataDog/datadog-go v4.8.2+incompatible
github.com/aws/aws-sdk-go v1.34.28
github.com/getsentry/sentry-go v0.12.0
github.com/go-kit/kit v0.9.0
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/magefile/mage v1.15.0
Expand All @@ -16,6 +17,7 @@ require (
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.8.4
github.com/twmb/franz-go v1.12.1
github.com/twmb/franz-go/pkg/kmsg v1.4.0
google.golang.org/grpc v1.60.1
google.golang.org/protobuf v1.32.0
gopkg.in/DataDog/dd-trace-go.v1 v1.47.0
Expand All @@ -38,6 +40,7 @@ require (
github.com/dgraph-io/ristretto v0.1.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
github.com/golang/glog v1.1.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand Down Expand Up @@ -67,7 +70,6 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.4.0 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
golang.org/x/net v0.20.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,16 @@ github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-kit/kit v0.9.0 h1:wDJmvq38kDhkVxi50ni9ykkdUr1PKgqKOoi01fa0Mdk=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.6.0 h1:wGYYu3uicYdqXVgoYbvnkrPVXkuLM1p1ifugDMEdRi4=
github.com/go-logfmt/logfmt v0.6.0/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab/go.mod h1:/P9AEU963A2AYjv4d1V5eVL1CQbEJq6aCNHDDjibzu8=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.7.0 h1:ueSltNNllEqE3qcWBTD0iQd3IpL/6U+mJxLkazJ7YPc=
github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
Expand Down
154 changes: 154 additions & 0 deletions pkg/pubsub/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package kafka

import (
"context"
"crypto/tls"
"crypto/x509"
"net"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/twmb/franz-go/pkg/kgo"
awssasl "github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"

sdklogger "github.com/scribd/go-sdk/pkg/logger"
"github.com/scribd/go-sdk/pkg/pubsub"
)

// Config provides a common configuration for Kafka PubSub clients.
type Config struct {
// Application name that will be used in a serviceName provided to tracer spans
ApplicationName string
// Kafka configuration provided by go-sdk
KafkaConfig pubsub.Kafka
// AWS session reference, it will be used in case AWS MSK IAM authentication mechanism is used
AwsSession *session.Session
// MsgHandler is a function that will be called when a message is received
MsgHandler MsgHandler
Logger sdklogger.Logger
}

const tlsConnectionTimeout = 10 * time.Second

func newConfig(c Config, opts ...kgo.Opt) ([]kgo.Opt, error) {
options := []kgo.Opt{
kgo.SeedBrokers(c.KafkaConfig.BrokerUrls...),
kgo.ClientID(c.KafkaConfig.ClientId),
}

if c.KafkaConfig.SASL.Enabled {
switch c.KafkaConfig.SASLMechanism() {
case pubsub.Plain:
options = append(options, getPlainSaslOption(c.KafkaConfig.SASL))
case pubsub.AWSMskIam:
options = append(options, getAwsMskIamSaslOption(c.KafkaConfig.SASL.AWSMskIam, c.AwsSession))
}
}

if c.KafkaConfig.TLS.Enabled || c.KafkaConfig.SecurityProtocol == "ssl" {
var caCertPool *x509.CertPool

if c.KafkaConfig.TLS.Ca != "" {
caCertPool = x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(c.KafkaConfig.TLS.Ca))
}

var certificates []tls.Certificate
if c.KafkaConfig.TLS.Cert != "" && c.KafkaConfig.TLS.CertKey != "" {
cert, err := tls.X509KeyPair([]byte(c.KafkaConfig.TLS.Cert), []byte(c.KafkaConfig.TLS.CertKey))
if err != nil {
return nil, err
}
certificates = []tls.Certificate{cert}
}

if c.KafkaConfig.Cert != "" && c.KafkaConfig.CertKey != "" {
cert, err := tls.X509KeyPair([]byte(c.KafkaConfig.Cert), []byte(c.KafkaConfig.CertKey))
if err != nil {
return nil, err
}
certificates = []tls.Certificate{cert}
}

var skipTLSVerify bool
if c.KafkaConfig.TLS.InsecureSkipVerify || !c.KafkaConfig.SSLVerificationEnabled {
skipTLSVerify = true
}

tlsDialer := &tls.Dialer{
NetDialer: &net.Dialer{Timeout: tlsConnectionTimeout},
Config: &tls.Config{
InsecureSkipVerify: skipTLSVerify,
Certificates: certificates,
RootCAs: caCertPool,
},
}

options = append(options, kgo.Dialer(tlsDialer.DialContext))
}

options = append(options, opts...)

return options, nil
}

func getPlainSaslOption(saslConf pubsub.SASL) kgo.Opt {
return kgo.SASL(plain.Auth{
User: saslConf.Username,
Pass: saslConf.Password,
}.AsMechanism())
}

func getAwsMskIamSaslOption(iamConf pubsub.SASLAwsMskIam, s *session.Session) kgo.Opt {
var opt kgo.Opt

// no AWS session provided
if s == nil {
opt = kgo.SASL(awssasl.Auth{
AccessKey: iamConf.AccessKey,
SecretKey: iamConf.SecretKey,
SessionToken: iamConf.SessionToken,
UserAgent: iamConf.UserAgent,
}.AsManagedStreamingIAMMechanism())
} else {
opt = kgo.SASL(
awssasl.ManagedStreamingIAM(func(ctx context.Context) (awssasl.Auth, error) {
// If assumable role is not provided, we try to get credentials from the provided AWS session
if iamConf.AssumableRole == "" {
val, err := s.Config.Credentials.Get()
if err != nil {
return awssasl.Auth{}, err
}

return awssasl.Auth{
AccessKey: val.AccessKeyID,
SecretKey: val.SecretAccessKey,
SessionToken: val.SessionToken,
UserAgent: iamConf.UserAgent,
}, nil
}

svc := sts.New(s)

res, stsErr := svc.AssumeRole(&sts.AssumeRoleInput{
RoleArn: &iamConf.AssumableRole,
RoleSessionName: &iamConf.SessionName,
})
if stsErr != nil {
return awssasl.Auth{}, stsErr
}

return awssasl.Auth{
AccessKey: *res.Credentials.AccessKeyId,
SecretKey: *res.Credentials.SecretAccessKey,
SessionToken: *res.Credentials.SessionToken,
UserAgent: iamConf.UserAgent,
}, nil
}),
)
}

return opt
}
42 changes: 42 additions & 0 deletions pkg/pubsub/kafka/partitionconsumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package kafka

import (
"context"

"github.com/twmb/franz-go/pkg/kgo"

sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
sdklogger "github.com/scribd/go-sdk/pkg/logger"
)

type pconsumer struct {
pool *pool

quit chan struct{}
done chan struct{}
recs chan *sdkkafka.FetchPartition
}

func (pc *pconsumer) consume(cl *kgo.Client, logger sdklogger.Logger, shouldCommit bool, handler func(*kgo.Record)) {
defer close(pc.done)

for {
select {
case <-pc.quit:
return
case p := <-pc.recs:
p.EachRecord(func(rec *kgo.Record) {
pc.pool.Schedule(func() {
defer p.ConsumeRecord(rec)

handler(rec)
})
})
if shouldCommit {
if err := cl.CommitRecords(context.Background(), p.Records...); err != nil {
logger.WithError(err).Errorf("Partition consumer failed to commit records")
}
}
}
}
}
33 changes: 33 additions & 0 deletions pkg/pubsub/kafka/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package kafka

type pool struct {
sem chan struct{}
work chan func()
}

func newPool(size int) *pool {
p := &pool{
sem: make(chan struct{}, size),
work: make(chan func()),
}

return p
}

func (p *pool) Schedule(task func()) {
select {
case p.work <- task:
return
case p.sem <- struct{}{}:
go p.worker(task)
}
}

func (p *pool) worker(task func()) {
defer func() { <-p.sem }()

for {
task()
task = <-p.work
}
}
85 changes: 85 additions & 0 deletions pkg/pubsub/kafka/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package kafka

import (
"context"
"fmt"
"time"

"github.com/twmb/franz-go/pkg/kgo"

sdkkafka "github.com/scribd/go-sdk/pkg/instrumentation/kafka"
)

type (
Publisher struct {
producer *sdkkafka.Client
}
)

const (
defaultFlushTimeout = time.Second * 10

publisherServiceNameSuffix = "pubsub-publisher"
)

// NewPublisher is a tiny wrapper around the go-sdk kafka.Client and provides API to Publish kafka messages.
func NewPublisher(c Config, opts ...kgo.Opt) (*Publisher, error) {
serviceName := fmt.Sprintf("%s-%s", c.ApplicationName, publisherServiceNameSuffix)

cfg, err := newConfig(c, opts...)
if err != nil {
return nil, err
}

cfg = append(cfg, []kgo.Opt{
kgo.ProduceRequestTimeout(c.KafkaConfig.Publisher.WriteTimeout),
kgo.RecordRetries(c.KafkaConfig.Publisher.MaxAttempts),
}...)

producer, err := sdkkafka.NewClient(cfg, sdkkafka.WithServiceName(serviceName))
if err != nil {
return nil, err
}

return &Publisher{producer: producer}, nil
}

// Publish publishes kgo.Record message.
func (p *Publisher) Publish(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error)) {
p.producer.Produce(ctx, rec, fn)
}

// Produce is an alias to Publish to satisfy kafka go-kit transport.
func (p *Publisher) Produce(ctx context.Context, rec *kgo.Record, fn func(record *kgo.Record, err error)) {
p.Publish(ctx, rec, fn)
}

// ProduceSync publishes kgo.Record messages synchronously.
func (p *Publisher) ProduceSync(ctx context.Context, rs ...*kgo.Record) kgo.ProduceResults {
return p.producer.ProduceSync(ctx, rs...)
}

// GetKafkaProducer returns underlying kafka.Producer for fine-grained tuning purposes.
func (p *Publisher) GetKafkaProducer() *sdkkafka.Client {
return p.producer
}

// Stop flushes and waits for outstanding messages and requests to complete delivery.
// It also closes a Producer instance.
func (p *Publisher) Stop(ctx context.Context) error {
if _, deadlineSet := ctx.Deadline(); !deadlineSet {
timeoutCtx, cancel := context.WithTimeout(ctx, defaultFlushTimeout)
defer cancel()

ctx = timeoutCtx
}

err := p.producer.Flush(ctx)
if err != nil {
return err
}

p.producer.Close()

return nil
}
Loading
Loading