Skip to content

Commit

Permalink
improvement: kafka client creation
Browse files Browse the repository at this point in the history
  • Loading branch information
yinheli committed Jun 15, 2022
1 parent 39b24e2 commit 484eba8
Showing 1 changed file with 28 additions and 11 deletions.
39 changes: 28 additions & 11 deletions pkg/kafkax/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
Expand All @@ -20,6 +21,7 @@ import (

var (
configMap = make(map[string]mqConfig, 8)
clientID int32
)

type mqConfig struct {
Expand Down Expand Up @@ -77,7 +79,7 @@ func New(name string, cfg ...*sarama.Config) (Client, error) {
config = cfg[0]
}
if config == nil {
config = sarama.NewConfig()
config = NewDefaultKafkaConfig()
}

if version, err := sarama.ParseKafkaVersion(c.Version); err != nil {
Expand All @@ -86,27 +88,42 @@ func New(name string, cfg ...*sarama.Config) (Client, error) {
config.Version = version
}

hostname, _ := os.Hostname()
if hostname == "" {
hostname = netx.InternalIp()
if config.ClientID == "" {
config.ClientID = kafkaClientID()
}

if client, err := sarama.NewClient(c.Broker, config); err != nil {
log.Fatal("init kafka client error", zap.Error(err), zap.String("name", c.Name))
} else {
kafka.client = client
}
config.ClientID = fmt.Sprint("sarama", "_", hostname, "_", os.Getpid())

return kafka, nil
}

func NewDefaultKafkaConfig() *sarama.Config {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true

maxMessageBytes := 1024 * 1024 * 10
if config.Producer.MaxMessageBytes < maxMessageBytes {
config.Producer.MaxMessageBytes = maxMessageBytes
}

if client, err := sarama.NewClient(c.Broker, config); err != nil {
log.Fatal("init kafka client error", zap.Error(err), zap.String("name", c.Name))
} else {
kafka.client = client
}
config.ClientID = kafkaClientID()

return kafka, nil
return config
}

func kafkaClientID() string {
atomic.AddInt32(&clientID, 1)
hostname, _ := os.Hostname()
if hostname == "" {
hostname = netx.InternalIp()
}
return fmt.Sprint("sarama", "_", hostname, "_", os.Getpid(), "_", clientID)
}

// Get get kafka client
Expand Down

0 comments on commit 484eba8

Please sign in to comment.