Skip to content

Commit

Permalink
Shrink the scope of the partition circuit-breaker
Browse files Browse the repository at this point in the history
It shouldn't wrap the entirety of `assignPartition`, since that also protects
things like the call to actually choose the partition. The only thing it is
supposed to protect is the call to get the list of partitions, since that is
what can block.
  • Loading branch information
eapache committed Jun 9, 2015
1 parent b86f862 commit 0cba7e3
Showing 1 changed file with 10 additions and 11 deletions.
21 changes: 10 additions & 11 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *Producer

for msg := range input {
if msg.retries == 0 {
err := breaker.Run(func() error {
return p.assignPartition(partitioner, msg)
})
if err != nil {
if err := p.assignPartition(breaker, partitioner, msg); err != nil {
p.returnError(msg, err)
continue
}
Expand Down Expand Up @@ -636,15 +633,17 @@ func (p *asyncProducer) shutdown() {
close(p.successes)
}

func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error {
var partitions []int32
var err error

if partitioner.RequiresConsistency() {
partitions, err = p.client.Partitions(msg.Topic)
} else {
partitions, err = p.client.WritablePartitions(msg.Topic)
}
err := breaker.Run(func() (err error) {
if partitioner.RequiresConsistency() {
partitions, err = p.client.Partitions(msg.Topic)
} else {
partitions, err = p.client.WritablePartitions(msg.Topic)
}
return
})

if err != nil {
return err
Expand Down

0 comments on commit 0cba7e3

Please sign in to comment.