Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: mq raw config support InstanceName md5 and Addr http prefix #638

Merged
merged 19 commits into from
Dec 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 68 additions & 86 deletions pkg/client/rocketmq/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,136 +127,118 @@ func DefaultConfig() *Config {

// StdPushConsumerConfig ...
func StdPushConsumerConfig(name string) *PushConsumerConfig {
rc := RawConfig(constant.ConfigKey("rocketmq." + name))
pc := rc.PushConsumer
// 兼容rocket_client_mq变更,addr需要携带shceme
if len(rc.PushConsumer.Addr) == 0 {
pc.Addr = rc.Addresses
}

pc.Name = name

for ind, addr := range pc.Addr {
if strings.HasPrefix(addr, "http") {
pc.Addr[ind] = addr
} else {
pc.Addr[ind] = "http://" + addr
}
}

// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if pc.InstanceName == "" {
pc.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pc.Addr, ","))))
}

return pc
return RawPushConsumerConfig(constant.ConfigKey("rocketmq." + name))
}

// StdPullConsumerConfig ...
func StdPullConsumerConfig(name string) *PullConsumerConfig {
return RawPullConsumerConfig(constant.ConfigKey("rocketmq." + name))
}

// StdProducerConfig ...
func StdProducerConfig(name string) *ProducerConfig {
return RawProducerConfig(constant.ConfigKey("rocketmq." + name))
}

// RawPushConsumerConfig 返push consume回配置
func RawPushConsumerConfig(name string) *PushConsumerConfig {
var defaultConfig = DefaultConfig()
var pushConsumerConfig = defaultConfig.PushConsumer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pushConsumerConfig))
}

rc := RawConfig(constant.ConfigKey("rocketmq." + name))
pc := rc.PullConsumer
// 兼容rocket_client_mq变更,addr需要携带shceme
if len(pc.Addr) == 0 {
pc.Addr = rc.Addresses
if len(pushConsumerConfig.Addr) == 0 {
pushConsumerConfig.Addr = defaultConfig.Addresses
}

pc.Name = name
pushConsumerConfig.Name = name

for ind, addr := range pc.Addr {
for ind, addr := range pushConsumerConfig.Addr {
if strings.HasPrefix(addr, "http") {
pc.Addr[ind] = addr
pushConsumerConfig.Addr[ind] = addr
} else {
pc.Addr[ind] = "http://" + addr
pushConsumerConfig.Addr[ind] = "http://" + addr
}
}

// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if pc.InstanceName == "" {
pc.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pc.Addr, ","))))
if pushConsumerConfig.InstanceName == "" {
pushConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pushConsumerConfig.Addr, ","))))
}
return pc

if xdebug.IsDevelopmentMode() {
xdebug.PrettyJsonPrint(name, pushConsumerConfig)
}
return pushConsumerConfig
}

// StdProducerConfig ...
func StdProducerConfig(name string) *ProducerConfig {
// RawPullConsumerConfig 返回pull consume配置
func RawPullConsumerConfig(name string) *PullConsumerConfig {
var defaultConfig = DefaultConfig()
var pullConsumerConfig = defaultConfig.PullConsumer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", pullConsumerConfig))
}

rc := RawConfig(constant.ConfigKey("rocketmq." + name))
pc := rc.Producer
// 兼容rocket_client_mq变更,addr需要携带shceme
if len(pc.Addr) == 0 {
pc.Addr = rc.Addresses
if len(pullConsumerConfig.Addr) == 0 {
pullConsumerConfig.Addr = defaultConfig.Addresses
}

pc.Name = name
for ind, addr := range pc.Addr {
pullConsumerConfig.Name = name

for ind, addr := range pullConsumerConfig.Addr {
if strings.HasPrefix(addr, "http") {
pc.Addr[ind] = addr
pullConsumerConfig.Addr[ind] = addr
} else {
pc.Addr[ind] = "http://" + addr
pullConsumerConfig.Addr[ind] = "http://" + addr
}
}

// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if pc.InstanceName == "" {
pc.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pc.Addr, ","))))
}

return pc
}

// RawConfig 返回配置
func RawConfig(key string) *Config {
var config = DefaultConfig()
if err := conf.UnmarshalKey(key, &config, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", key), xlog.Any("config", config))
if pullConsumerConfig.InstanceName == "" {
pullConsumerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(pullConsumerConfig.Addr, ","))))
}

if xdebug.IsDevelopmentMode() {
xdebug.PrettyJsonPrint(key, config)
xdebug.PrettyJsonPrint(name, pullConsumerConfig)
}
return config
return pullConsumerConfig
}

// RawPushConsumerConfig 返push consume回配置
func RawPushConsumerConfig(key string) *PushConsumerConfig {
var pushConsumerConfig = DefaultConfig().PushConsumer
if err := conf.UnmarshalKey(key, &pushConsumerConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", key), xlog.Any("config", pushConsumerConfig))
// RawProducerConfig 返回produce配置
func RawProducerConfig(name string) *ProducerConfig {
var defaultConfig = DefaultConfig()
var producerConfig = defaultConfig.Producer
if err := conf.UnmarshalKey(name, &defaultConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", name), xlog.Any("config", defaultConfig))
}

if xdebug.IsDevelopmentMode() {
xdebug.PrettyJsonPrint(key, pushConsumerConfig)
}
return pushConsumerConfig
}

// RawPullConsumerConfig 返回pull consume配置
func RawPullConsumerConfig(key string) *PullConsumerConfig {
var pullConsumerConfig = DefaultConfig().PullConsumer
if err := conf.UnmarshalKey(key, &pullConsumerConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", key), xlog.Any("config", pullConsumerConfig))
// 兼容rocket_client_mq变更,addr需要携带shceme
if len(producerConfig.Addr) == 0 {
producerConfig.Addr = defaultConfig.Addresses
}

if xdebug.IsDevelopmentMode() {
xdebug.PrettyJsonPrint(key, pullConsumerConfig)
producerConfig.Name = name
for ind, addr := range producerConfig.Addr {
if strings.HasPrefix(addr, "http") {
producerConfig.Addr[ind] = addr
} else {
producerConfig.Addr[ind] = "http://" + addr
}
}
return pullConsumerConfig
}

// RawProducerConfig 返回produce配置
func RawProducerConfig(key string) *ProducerConfig {
var producerConfig = DefaultConfig().Producer
if err := conf.UnmarshalKey(key, &producerConfig, conf.TagName("toml")); err != nil {
xlog.Jupiter().Panic("unmarshal config", xlog.FieldErr(err), xlog.String("key", key), xlog.Any("config", producerConfig))
// 这里根据mq集群地址的md5,生成默认InstanceName
// 实现自动支持多集群,解决官方库默认不支持多集群消费的问题
if producerConfig.InstanceName == "" {
producerConfig.InstanceName = fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(producerConfig.Addr, ","))))
}

if xdebug.IsDevelopmentMode() {
xdebug.PrettyJsonPrint(key, producerConfig)
xdebug.PrettyJsonPrint(name, producerConfig)
}
return producerConfig
}