Skip to content

Commit

Permalink
Try a different producer in the pool on error (#6)
Browse files Browse the repository at this point in the history
* Try a different producer in the pool on error

* Tweak the error message
  • Loading branch information
prep authored Nov 5, 2019
1 parent 1fddb00 commit 436b9df
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions producer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
// ProducerPool manages a connection pool of Producers and provides a simple
// interface for balancing Put requests over the pool of connections.
type ProducerPool struct {
config Config
producers []*Producer
stopOnce sync.Once
mu sync.RWMutex
Expand All @@ -21,7 +22,7 @@ type ProducerPool struct {
func NewProducerPool(uris []string, config Config) (*ProducerPool, error) {
config = config.normalize()

var pool ProducerPool
pool := &ProducerPool{config: config}
for _, URI := range uris {
producer, err := NewProducer(URI, config)
if err != nil {
Expand All @@ -32,7 +33,7 @@ func NewProducerPool(uris []string, config Config) (*ProducerPool, error) {
pool.producers = append(pool.producers, producer)
}

return &pool, nil
return pool, nil
}

// Stop all the producers in this pool.
Expand Down Expand Up @@ -65,8 +66,10 @@ func (pool *ProducerPool) Put(ctx context.Context, tube string, body []byte, par
// If a producer is disconnected, try the next one.
case err == ErrDisconnected:
continue
// If a producer returns any other error, log it and try the next one.
case err != nil:
return 0, err
pool.config.ErrorLog.Printf("ProducerPool could not put job: %s", err)
continue
}

return id, nil
Expand Down

0 comments on commit 436b9df

Please sign in to comment.