Skip to content

Commit

Permalink
Shuffle seed brokers so we don't always connect to the first one prov…
Browse files Browse the repository at this point in the history
…ided
  • Loading branch information
wvanbergen committed May 1, 2015
1 parent 01d71fe commit 1874f69
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
7 changes: 5 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"math/rand"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -117,8 +118,10 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
coordinators: make(map[string]int32),
}
for _, addr := range addrs {
client.seedBrokers = append(client.seedBrokers, NewBroker(addr))

random := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, index := range random.Perm(len(addrs)) {
client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
}

// do an initial fetch of all cluster metadata by specifing an empty list of topics
Expand Down
26 changes: 17 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,25 +375,33 @@ func TestClientRefreshBehaviour(t *testing.T) {
}

func TestClientResurrectDeadSeeds(t *testing.T) {
seed1 := newMockBroker(t, 1)
seed2 := newMockBroker(t, 2)
seed3 := newMockBroker(t, 3)
addr1 := seed1.Addr()
addr2 := seed2.Addr()
addr3 := seed3.Addr()

initialSeed := newMockBroker(t, 0)
emptyMetadata := new(MetadataResponse)
seed1.Returns(emptyMetadata)
initialSeed.Returns(emptyMetadata)

conf := NewConfig()
conf.Metadata.Retry.Backoff = 0
conf.Metadata.RefreshFrequency = 0
c, err := NewClient([]string{addr1, addr2, addr3}, conf)
c, err := NewClient([]string{initialSeed.Addr()}, conf)
if err != nil {
t.Fatal(err)
}
initialSeed.Close()

client := c.(*client)

seed1 := newMockBroker(t, 1)
seed2 := newMockBroker(t, 2)
seed3 := newMockBroker(t, 3)
addr1 := seed1.Addr()
addr2 := seed2.Addr()
addr3 := seed3.Addr()

// Overwrite the seed brokers with a fixed ordering to make this test deterministic.
safeClose(t, client.seedBrokers[0])
client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
client.deadSeeds = []*Broker{}

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
Expand Down

0 comments on commit 1874f69

Please sign in to comment.