-
Notifications
You must be signed in to change notification settings - Fork 0
/
config.go
133 lines (111 loc) · 3.7 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package wkafka
import (
"fmt"
"regexp"
"time"
)
var DefaultRetryInterval = 10 * time.Second
type Config struct {
// Brokers is a list of kafka brokers to connect to.
// Not all brokers need to be specified, the list is so that
// if one broker is unavailable, another can be used.
// Required at least one broker. Example value is 'localhost:9092'.
Brokers []string `cfg:"brokers"`
Security SecurityConfig `cfg:"security"`
// Compressions is chosen in the order preferred based on broker support.
// The default is to use no compression.
// Available:
// - gzip
// - snappy
// - lz4
// - zstd
Compressions []string `cfg:"compressions"`
// Consumer is a pre configuration for consumer and validation.
Consumer ConsumerPreConfig `cfg:"consumer"`
}
type ConsumerPreConfig struct {
// PrefixGroupID add prefix to group_id.
PrefixGroupID string `cfg:"prefix_group_id"`
// FormatDLQTopic is a format string to generate DLQ topic name.
// - Example is "finops_{{.AppName}}_dlq"
// - It should be exist if DLQ is enabled and topic is not set.
//
// - Available variables:
// - AppName
FormatDLQTopic string `cfg:"format_dlq_topic"`
// Validation is a configuration for validation when consumer initialized.
Validation Validation `cfg:"validation"`
}
// configApply configuration to ConsumerConfig and check validation.
func configApply(c ConsumerPreConfig, consumerConfig *ConsumerConfig, progName string, logger Logger) error {
if c.PrefixGroupID != "" {
consumerConfig.GroupID = c.PrefixGroupID + consumerConfig.GroupID
}
if !consumerConfig.DLQ.Disabled && consumerConfig.DLQ.Topic == "" && c.FormatDLQTopic == "" {
consumerConfig.DLQ.Disabled = true
logger.Warn("dlq is disabled because topic and format_dlq_topic is not set")
}
// add default topic name for DLQ
if !consumerConfig.DLQ.Disabled {
if consumerConfig.DLQ.Topic == "" {
if c.FormatDLQTopic == "" {
return fmt.Errorf("format_dlq_topic is required if dlq topic is not set")
}
var err error
consumerConfig.DLQ.Topic, err = templateRun(c.FormatDLQTopic, map[string]string{"AppName": progName})
if err != nil {
return fmt.Errorf("format_dlq_topic: %w", err)
}
}
if consumerConfig.Skip == nil {
consumerConfig.Skip = map[string]map[int32]OffsetConfig{
consumerConfig.DLQ.Topic: consumerConfig.DLQ.Skip,
}
} else {
if _, ok := consumerConfig.Skip[consumerConfig.DLQ.Topic]; !ok {
consumerConfig.Skip[consumerConfig.DLQ.Topic] = consumerConfig.DLQ.Skip
}
}
if consumerConfig.DLQ.RetryInterval == 0 {
consumerConfig.DLQ.RetryInterval = DefaultRetryInterval
}
}
if err := c.Validation.Validate(consumerConfig); err != nil {
return fmt.Errorf("validate consumer config: %w", err)
}
return nil
}
// Validation is a configuration for validation when consumer initialized.
type Validation struct {
GroupID GroupIDValidation `cfg:"group_id"`
}
// GroupIDValidation is a configuration for group_id validation.
type GroupIDValidation struct {
Enabled bool `cfg:"enabled"`
// RgxGroupID is a regex pattern to validate RgxGroupID.
RgxGroupID string `cfg:"rgx_group_id"`
}
func (v GroupIDValidation) Validate(groupID string) error {
if !v.Enabled {
return nil
}
if groupID == "" {
return fmt.Errorf("group_id is required")
}
if v.RgxGroupID != "" {
rgx, err := regexp.Compile(v.RgxGroupID)
if err != nil {
return fmt.Errorf("group_id validation regex: %w", err)
}
if !rgx.MatchString(groupID) {
return fmt.Errorf("group_id validation failed regex [%s], value [%s]", v.RgxGroupID, groupID)
}
}
return nil
}
func (v Validation) Validate(consumerConfig *ConsumerConfig) error {
if err := v.GroupID.Validate(consumerConfig.GroupID); err != nil {
return err
}
return nil
}