-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
sync_producer.go
197 lines (159 loc) · 5.95 KB
/
sync_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package sarama
import "sync"
// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
//
// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
//
// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
// be set to true in its configuration.
type SyncProducer interface {
// SendMessage produces a given message, and returns only when it either has
// succeeded or failed to produce. It will return the partition and the offset
// of the produced message, or an error if the message failed to produce.
SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
// SendMessages produces a given set of messages, and returns only when all
// messages in the set have either succeeded or failed. Note that messages
// can succeed and fail individually; if some succeed and some fail,
// SendMessages will return an error.
SendMessages(msgs []*ProducerMessage) error
// Close shuts down the producer; you must call this function before a producer
// object passes out of scope, as it may otherwise leak memory.
// You must call this before calling Close on the underlying client.
Close() error
// TxnStatus return current producer transaction status.
TxnStatus() ProducerTxnStatusFlag
// IsTransactional return true when current producer is transactional.
IsTransactional() bool
// BeginTxn mark current transaction as ready.
BeginTxn() error
// CommitTxn commit current transaction.
CommitTxn() error
// AbortTxn abort current transaction.
AbortTxn() error
// AddOffsetsToTxn add associated offsets to current transaction.
AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error
// AddMessageToTxn add message offsets to current transaction.
AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}
type syncProducer struct {
producer *asyncProducer
wg sync.WaitGroup
}
// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
if config == nil {
config = NewConfig()
config.Producer.Return.Successes = true
}
if err := verifyProducerConfig(config); err != nil {
return nil, err
}
p, err := NewAsyncProducer(addrs, config)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this producer.
func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
if err := verifyProducerConfig(client.Config()); err != nil {
return nil, err
}
p, err := NewAsyncProducerFromClient(client)
if err != nil {
return nil, err
}
return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}
func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
sp := &syncProducer{producer: p}
sp.wg.Add(2)
go withRecover(sp.handleSuccesses)
go withRecover(sp.handleErrors)
return sp
}
func verifyProducerConfig(config *Config) error {
if !config.Producer.Return.Errors {
return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
}
if !config.Producer.Return.Successes {
return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
}
return nil
}
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
expectation := make(chan *ProducerError, 1)
msg.expectation = expectation
sp.producer.Input() <- msg
if pErr := <-expectation; pErr != nil {
return -1, -1, pErr.Err
}
return msg.Partition, msg.Offset, nil
}
func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
expectations := make(chan chan *ProducerError, len(msgs))
go func() {
for _, msg := range msgs {
expectation := make(chan *ProducerError, 1)
msg.expectation = expectation
sp.producer.Input() <- msg
expectations <- expectation
}
close(expectations)
}()
var errors ProducerErrors
for expectation := range expectations {
if pErr := <-expectation; pErr != nil {
errors = append(errors, pErr)
}
}
if len(errors) > 0 {
return errors
}
return nil
}
func (sp *syncProducer) handleSuccesses() {
defer sp.wg.Done()
for msg := range sp.producer.Successes() {
expectation := msg.expectation
expectation <- nil
}
}
func (sp *syncProducer) handleErrors() {
defer sp.wg.Done()
for err := range sp.producer.Errors() {
expectation := err.Msg.expectation
expectation <- err
}
}
func (sp *syncProducer) Close() error {
sp.producer.AsyncClose()
sp.wg.Wait()
return nil
}
func (sp *syncProducer) IsTransactional() bool {
return sp.producer.IsTransactional()
}
func (sp *syncProducer) BeginTxn() error {
return sp.producer.BeginTxn()
}
func (sp *syncProducer) CommitTxn() error {
return sp.producer.CommitTxn()
}
func (sp *syncProducer) AbortTxn() error {
return sp.producer.AbortTxn()
}
func (sp *syncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
return sp.producer.AddOffsetsToTxn(offsets, groupId)
}
func (sp *syncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
return sp.producer.AddMessageToTxn(msg, groupId, metadata)
}
func (p *syncProducer) TxnStatus() ProducerTxnStatusFlag {
return p.producer.TxnStatus()
}