Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*.Retry.BackoffFunc #1160

Merged
merged 3 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,19 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan
return input
}

func (pp *partitionProducer) backoff(retries int) {
var backoff time.Duration
if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
maxRetries := pp.parent.conf.Producer.Retry.Max
backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
} else {
backoff = pp.parent.conf.Producer.Retry.Backoff
}
if backoff > 0 {
time.Sleep(backoff)
}
}

func (pp *partitionProducer) dispatch() {
// try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
// on the first message
Expand All @@ -440,7 +453,7 @@ func (pp *partitionProducer) dispatch() {
if msg.retries > pp.highWatermark {
// a new, higher, retry level; handle it and then back off
pp.newHighWatermark(msg.retries)
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
pp.backoff(msg.retries)
} else if pp.highWatermark > 0 {
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
if msg.retries < pp.highWatermark {
Expand Down Expand Up @@ -468,7 +481,7 @@ func (pp *partitionProducer) dispatch() {
if pp.output == nil {
if err := pp.updateLeader(); err != nil {
pp.parent.returnError(msg, err)
time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
pp.backoff(msg.retries)
continue
}
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
Expand Down
68 changes: 68 additions & 0 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -484,6 +485,73 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
closeProducer(t, producer)
}

func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
leader1 := NewMockBroker(t, 2)
leader2 := NewMockBroker(t, 3)

metadataLeader1 := new(MetadataResponse)
metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
seedBroker.Returns(metadataLeader1)

config := NewConfig()
config.Producer.Flush.Messages = 1
config.Producer.Return.Successes = true
config.Producer.Retry.Max = 4

backoffCalled := make([]int32, config.Producer.Retry.Max+1)
config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
atomic.AddInt32(&backoffCalled[retries-1], 1)
return 0
}
producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
prodNotLeader := new(ProduceResponse)
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)

prodSuccess := new(ProduceResponse)
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)

metadataLeader2 := new(MetadataResponse)
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)

leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader2.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader1)
leader1.Returns(prodNotLeader)
seedBroker.Returns(metadataLeader2)
leader2.Returns(prodSuccess)

expectResults(t, producer, 1, 0)

producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
leader2.Returns(prodSuccess)
expectResults(t, producer, 1, 0)

seedBroker.Close()
leader1.Close()
leader2.Close()
closeProducer(t, producer)

for i := 0; i < config.Producer.Retry.Max; i++ {
if atomic.LoadInt32(&backoffCalled[i]) != 1 {
t.Errorf("expected one retry attempt #%d", i)
}
}
if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
}
}

func TestAsyncProducerOutOfRetries(t *testing.T) {
t.Skip("Enable once bug #294 is fixed.")

Expand Down
20 changes: 17 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,11 @@ func (client *client) refreshMetadata() error {
func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
retry := func(err error) error {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
if backoff > 0 {
time.Sleep(backoff)
}
return client.tryRefreshMetadata(topics, attemptsRemaining-1)
}
return err
Expand Down Expand Up @@ -816,11 +819,22 @@ func (client *client) cachedController() *Broker {
return client.brokers[client.controllerID]
}

func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
if client.conf.Metadata.Retry.BackoffFunc != nil {
maxRetries := client.conf.Metadata.Retry.Max
retries := maxRetries - attemptsRemaining
return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
} else {
return client.conf.Metadata.Retry.Backoff
}
}

func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
time.Sleep(client.conf.Metadata.Retry.Backoff)
backoff := client.computeBackoff(attemptsRemaining)
Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
time.Sleep(backoff)
return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
}
return nil, err
Expand Down
38 changes: 38 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"io"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -260,6 +261,43 @@ func TestClientGetOffset(t *testing.T) {
safeClose(t, client)
}

func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
seedBroker := NewMockBroker(t, 1)

metadataResponse1 := new(MetadataResponse)
seedBroker.Returns(metadataResponse1)

retryCount := int32(0)

config := NewConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
atomic.AddInt32(&retryCount, 1)
return 0
}
client, err := NewClient([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

metadataUnknownTopic := new(MetadataResponse)
metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
seedBroker.Returns(metadataUnknownTopic)
seedBroker.Returns(metadataUnknownTopic)

if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
t.Error("ErrUnknownTopicOrPartition expected, got", err)
}

safeClose(t, client)
seedBroker.Close()

actualRetryCount := atomic.LoadInt32(&retryCount)
if actualRetryCount != 1 {
t.Fatalf("Expected BackoffFunc to be called exactly once, but saw %d", actualRetryCount)
}
}

func TestClientReceivingUnknownTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)

Expand Down
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ type Config struct {
// How long to wait for leader election to occur before retrying
// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
Backoff time.Duration
// Called to compute backoff time dynamically. Useful for implementing
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}
// How frequently to refresh the cluster metadata in the background.
// Defaults to 10 minutes. Set to 0 to disable. Similar to
Expand Down Expand Up @@ -168,6 +172,10 @@ type Config struct {
// (default 100ms). Similar to the `retry.backoff.ms` setting of the
// JVM producer.
Backoff time.Duration
// Called to compute backoff time dynamically. Useful for implementing
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries, maxRetries int) time.Duration
}
}

Expand All @@ -185,6 +193,10 @@ type Config struct {
// How long to wait after a failing to read from a partition before
// trying again (default 2s).
Backoff time.Duration
// Called to compute backoff time dynamically. Useful for implementing
// more sophisticated backoff strategies. This takes precedence over
// `Backoff` if set.
BackoffFunc func(retries int) time.Duration
}

// Fetch is the namespace for controlling how many bytes are retrieved by any
Expand Down
17 changes: 16 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ type partitionConsumer struct {

fetchSize int32
offset int64

retries int32
}

var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
Expand All @@ -332,12 +334,21 @@ func (child *partitionConsumer) sendError(err error) {
}
}

func (child *partitionConsumer) computeBackoff() time.Duration {
if child.conf.Consumer.Retry.BackoffFunc != nil {
retries := atomic.AddInt32(&child.retries, 1)
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
} else {
return child.conf.Consumer.Retry.Backoff
}
}

func (child *partitionConsumer) dispatcher() {
for range child.trigger {
select {
case <-child.dying:
close(child.trigger)
case <-time.After(child.conf.Consumer.Retry.Backoff):
case <-time.After(child.computeBackoff()):
if child.broker != nil {
child.consumer.unrefBrokerConsumer(child.broker)
child.broker = nil
Expand Down Expand Up @@ -451,6 +462,10 @@ feederLoop:
for response := range child.feeder {
msgs, child.responseResult = child.parseResponse(response)

if child.responseResult == nil {
atomic.StoreInt32(&child.retries, 0)
}

for i, msg := range msgs {
messageSelect:
select {
Expand Down
42 changes: 34 additions & 8 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/signal"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -180,9 +181,7 @@ func TestConsumerDuplicate(t *testing.T) {
broker0.Close()
}

// If consumer fails to refresh metadata it keeps retrying with frequency
// specified by `Config.Consumer.Retry.Backoff`.
func TestConsumerLeaderRefreshError(t *testing.T) {
func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
// Given
broker0 := NewMockBroker(t, 100)

Expand All @@ -200,11 +199,6 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
SetMessage("my_topic", 0, 123, testMsg),
})

config := NewConfig()
config.Net.ReadTimeout = 100 * time.Millisecond
config.Consumer.Retry.Backoff = 200 * time.Millisecond
config.Consumer.Return.Errors = true
config.Metadata.Retry.Max = 0
c, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -258,6 +252,38 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
broker0.Close()
}

// If consumer fails to refresh metadata it keeps retrying with frequency
// specified by `Config.Consumer.Retry.Backoff`.
func TestConsumerLeaderRefreshError(t *testing.T) {
config := NewConfig()
config.Net.ReadTimeout = 100 * time.Millisecond
config.Consumer.Retry.Backoff = 200 * time.Millisecond
config.Consumer.Return.Errors = true
config.Metadata.Retry.Max = 0

runConsumerLeaderRefreshErrorTestWithConfig(t, config)
}

func TestConsumerLeaderRefreshErrorWithBackoffFunc(t *testing.T) {
var calls int32 = 0

config := NewConfig()
config.Net.ReadTimeout = 100 * time.Millisecond
config.Consumer.Retry.BackoffFunc = func(retries int) time.Duration {
atomic.AddInt32(&calls, 1)
return 200 * time.Millisecond
}
config.Consumer.Return.Errors = true
config.Metadata.Retry.Max = 0

runConsumerLeaderRefreshErrorTestWithConfig(t, config)

// we expect at least one call to our backoff function
if calls == 0 {
t.Fail()
}
}

func TestConsumerInvalidTopic(t *testing.T) {
// Given
broker0 := NewMockBroker(t, 100)
Expand Down
11 changes: 10 additions & 1 deletion offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (om *offsetManager) Close() error {
return nil
}

func (om *offsetManager) computeBackoff(retries int) time.Duration {
if om.conf.Metadata.Retry.BackoffFunc != nil {
return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
} else {
return om.conf.Metadata.Retry.Backoff
}
}

func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
broker, err := om.coordinator()
if err != nil {
Expand Down Expand Up @@ -151,10 +159,11 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
if retries <= 0 {
return 0, "", block.Err
}
backoff := om.computeBackoff(retries)
select {
case <-om.closing:
return 0, "", block.Err
case <-time.After(om.conf.Metadata.Retry.Backoff):
case <-time.After(backoff):
}
return om.fetchInitialOffset(topic, partition, retries-1)
default:
Expand Down
Loading