Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[producer] Fix race between Len() and produceChannel receive #864

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 111 additions & 40 deletions kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ type Producer struct {

// Terminates the poller() goroutine
pollerTermChan chan bool

// To make sure we are not receiving from the produce channel the same
// time as reading its length, an intent to read the length must be
// communicated on this channel by sending `true`, and `false` when
// done. See Len().
lenIntentChan chan bool
}

// String returns a human readable name for a Producer instance
Expand Down Expand Up @@ -325,7 +331,12 @@ func (p *Producer) ProduceChannel() chan *Message {
// as well as delivery reports queued for the application.
// Includes messages on ProduceChannel.
func (p *Producer) Len() int {
return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
// This will block indefinitely if Close() has been called before calling Len(). But no
// method should be called after Close().
p.lenIntentChan <- true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not ensured that Len is executed before the select, it could be that the channelProducer is at line 578 and the len terminates without blocking the read from channel that had already happened. The sequence is:
g1:578, g2:337, g2:338, g2:339, g1:581

sum := len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
p.lenIntentChan <- false
return sum
}

// Flush and wait for outstanding messages and requests to complete delivery.
Expand Down Expand Up @@ -523,6 +534,7 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
p.events = make(chan Event, eventsChanSize)
p.produceChannel = make(chan *Message, produceChannelSize)
p.pollerTermChan = make(chan bool)
p.lenIntentChan = make(chan bool)

if logsChanEnable {
p.handle.setupLogQueue(logsChan, p.pollerTermChan)
Expand Down Expand Up @@ -553,11 +565,39 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {

// channel_producer serves the ProduceChannel channel
func channelProducer(p *Producer) {
for m := range p.produceChannel {
err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
if err != nil {
m.TopicPartition.Error = err
p.events <- m
for {
// If both a new message, and a length intent are available, select
// works on a uniform pseudo-random selection, so we will eventually
// allow the length intent to go through.
select {
case m, ok := <-p.produceChannel:
if !ok {
// Break loop after channel is closed and empty.
return
}
if m == nil {
panic("nil message received on ProduceChannel")
}
err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
if err != nil {
m.TopicPartition.Error = err
p.events <- m
}
case lenIntent := <-p.lenIntentChan:
// If the first message we receive across calls to Len() is not `true`,
// then it means that someone is using the lenIntentChan incorrectly.
if lenIntent != true {
break
}
waitingForLens := 1
for waitingForLens > 0 {
lenIntent = <-p.lenIntentChan
if lenIntent {
waitingForLens += 1
} else {
waitingForLens -= 1
}
}
}
}
}
Expand All @@ -571,48 +611,79 @@ func channelBatchProducer(p *Producer) {
totMsgCnt := 0
totBatchCnt := 0

for m := range p.produceChannel {
buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
bufferedCnt++

loop2:
for true {
select {
case m, ok := <-p.produceChannel:
if !ok {
break loop2
}
if m == nil {
panic("nil message received on ProduceChannel")
}
if m.TopicPartition.Topic == nil {
panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m))
}
buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
bufferedCnt++
if bufferedCnt >= batchSize {
for {
// If both a new message, and a length intent are available, select
// works on a uniform pseudo-random selection, so we will eventually
// allow the length intent to go through.
select {
case m, ok := <-p.produceChannel:
if !ok {
// Break loop after channel is closed and empty.
return
}
if m == nil {
panic("nil message received on ProduceChannel")
}
if m.TopicPartition.Topic == nil {
panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m))
}
buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
bufferedCnt++

loop2:
for true {
select {
case m, ok := <-p.produceChannel:
if !ok {
break loop2
}
if m == nil {
panic("nil message received on ProduceChannel")
}
if m.TopicPartition.Topic == nil {
panic(fmt.Sprintf("message without Topic received on ProduceChannel: %v", m))
}
buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
bufferedCnt++
if bufferedCnt >= batchSize {
break loop2
}
default:
break loop2
}
default:
break loop2
}
}

totBatchCnt++
totMsgCnt += len(buffered)
totBatchCnt++
totMsgCnt += len(buffered)

for topic, buffered2 := range buffered {
err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK)
if err != nil {
for _, m = range buffered2 {
m.TopicPartition.Error = err
p.events <- m
for topic, buffered2 := range buffered {
err := p.produceBatch(topic, buffered2, C.RD_KAFKA_MSG_F_BLOCK)
if err != nil {
for _, m = range buffered2 {
m.TopicPartition.Error = err
p.events <- m
}
}
}
}

buffered = make(map[string][]*Message)
bufferedCnt = 0
buffered = make(map[string][]*Message)
bufferedCnt = 0
case lenIntent := <-p.lenIntentChan:
// If the first message we receive across calls to Len() is not `true`,
// then it means that someone is using the lenIntentChan incorrectly.
if lenIntent != true {
break
}
waitingForLens := 1
for waitingForLens > 0 {
lenIntent = <-p.lenIntentChan
if lenIntent {
waitingForLens += 1
} else {
waitingForLens -= 1
}
}
}
}
}

Expand Down
60 changes: 60 additions & 0 deletions kafka/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"reflect"
"strings"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -721,3 +722,62 @@ func runProducerDeliveryReportFieldTest(t *testing.T, config *ConfigMap, fn func
t.Errorf("Expected empty queue after Flush, still has %d", r)
}
}

// TestProducerLenWithProduceChannel tests whether Len() works fine while using
// the ProduceChannel, using a MockCluster.
func TestProducerLenWithProduceChannel(t *testing.T) {
mc, err := NewMockCluster(1)
if err != nil {
t.Fatalf("%s", err)
}
defer mc.Close()

runTestProducerLenWithProduceChannel(t, mc.BootstrapServers(), false, 10000)
runTestProducerLenWithProduceChannel(t, mc.BootstrapServers(), true, 10000)
}

func runTestProducerLenWithProduceChannel(t *testing.T, bootstrapServers string, batch bool, numMessages int) {
p, err := NewProducer(&ConfigMap{
"bootstrap.servers": bootstrapServers,
"go.batch.producer": batch,
})
if err != nil {
t.Fatalf("%s", err)
}

topic := "gotest"
value := "test"

// Produce `numMessages` messages to the produce channel, with a microsecond gap between each.
var wg sync.WaitGroup
wg.Add(numMessages)
go func() {
for i := 0; i < numMessages; i++ {
p.ProduceChannel() <- &Message{TopicPartition: TopicPartition{Topic: &topic, Partition: PartitionAny}, Value: []byte(value)}
wg.Done()
time.Sleep(time.Microsecond)
}
}()

// Despite the messages being produced at a high rate, we should be able to get the length
// within a short amount of time.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()

lengthObtained := make(chan bool)
go func() {
p.Len()
lengthObtained <- true
}()

select {
case <-ctx.Done():
t.Errorf("Timed out while calling Len on Producer")
return
case <-lengthObtained:
// no action needed, just end the select.
}

wg.Wait()
p.Close()
}