Skip to content

Commit

Permalink
Merge pull request #1156 from imjustfly/master
Browse files Browse the repository at this point in the history
try fixing a PartitionConsumer's race condition
  • Loading branch information
bai authored Mar 13, 2019
2 parents c53ebd0 + dcc4684 commit da30382
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,12 +421,6 @@ func (child *partitionConsumer) AsyncClose() {
func (child *partitionConsumer) Close() error {
child.AsyncClose()

go withRecover(func() {
for range child.messages {
// drain
}
})

var errors ConsumerErrors
for err := range child.errors {
errors = append(errors, err)
Expand Down Expand Up @@ -458,14 +452,22 @@ feederLoop:
for i, msg := range msgs {
messageSelect:
select {
case <-child.dying:
child.broker.acks.Done()
continue feederLoop
case child.messages <- msg:
firstAttempt = true
case <-expiryTicker.C:
if !firstAttempt {
child.responseResult = errTimedOut
child.broker.acks.Done()
remainingLoop:
for _, msg = range msgs[i:] {
child.messages <- msg
select {
case child.messages <- msg:
case <-child.dying:
break remainingLoop
}
}
child.broker.input <- child
continue feederLoop
Expand Down

0 comments on commit da30382

Please sign in to comment.