Skip to content

Commit

Permalink
rafthttp: configurable stream reader retry timeout
Browse files Browse the repository at this point in the history
rafthttp.Transport.DialRetryTimeout field alters the frequency of dial attempts
  • Loading branch information
Vitaly Isaev committed May 31, 2017
1 parent e42d517 commit e78ee83
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 11 deletions.
8 changes: 8 additions & 0 deletions rafthttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -198,6 +199,13 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
recvc: p.recvc,
propc: p.propc,
}

if transport.DialRetryTimeout != 0 {
limit := rate.Every(transport.DialRetryTimeout)
p.msgAppV2Reader.rl = rate.NewLimiter(limit, 1)
p.msgAppReader.rl = rate.NewLimiter(limit, 1)
}

p.msgAppV2Reader.start()
p.msgAppReader.start()

Expand Down
28 changes: 19 additions & 9 deletions rafthttp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/transport"
Expand Down Expand Up @@ -278,6 +280,8 @@ type streamReader struct {
recvc chan<- raftpb.Message
propc chan<- raftpb.Message

rl *rate.Limiter // alters the frequency of dial retrial attempts

errorc chan<- error

mu sync.Mutex
Expand All @@ -289,14 +293,21 @@ type streamReader struct {
done chan struct{}
}

func (r *streamReader) start() {
r.stopc = make(chan struct{})
r.done = make(chan struct{})
if r.errorc == nil {
r.errorc = r.tr.ErrorC
func (cr *streamReader) start() {
cr.stopc = make(chan struct{})
cr.done = make(chan struct{})
if cr.errorc == nil {
cr.errorc = cr.tr.ErrorC
}

go r.run()
if cr.rl == nil {
// If client didn't provide rate limiter, use the default which will
// wait 100ms to create a new stream, so it doesn't bring too much
// overhead when retry.
cr.rl = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
}

go cr.run()
}

func (cr *streamReader) run() {
Expand All @@ -323,13 +334,12 @@ func (cr *streamReader) run() {
}
}
select {
// Wait 100ms to create a new stream, so it doesn't bring too much
// overhead when retry.
case <-time.After(100 * time.Millisecond):
case <-cr.stopc:
plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
close(cr.done)
return
default:
_ = cr.rl.Wait(context.TODO())
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions rafthttp/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"testing"
"time"

"golang.org/x/time/rate"

"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
Expand Down Expand Up @@ -113,6 +115,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
sr.dial(tt)

Expand Down Expand Up @@ -167,6 +170,7 @@ func TestStreamReaderDialResult(t *testing.T) {
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}

_, err := sr.dial(streamTypeMessage)
Expand All @@ -192,6 +196,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
errorc: make(chan error, 1),
typ: streamTypeMessage,
status: newPeerStatus(types.ID(2)),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
tr.onResp = func() {
// stop() waits for the run() goroutine to exit, but that exit
Expand Down Expand Up @@ -246,6 +251,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}

_, err := sr.dial(typ)
Expand Down
6 changes: 4 additions & 2 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ type Transporter interface {
// User needs to call Start before calling other functions, and call
// Stop when the Transport is no longer used.
type Transport struct {
DialTimeout time.Duration // maximum duration before timing out dial of the request
TLSInfo transport.TLSInfo // TLS information used when creating connection
DialTimeout time.Duration // maximum duration before timing out dial of the request
DialRetryTimeout time.Duration // alters the frequency of streamReader dial retrial attempts

TLSInfo transport.TLSInfo // TLS information used when creating connection

ID types.ID // local member ID
URLs types.URLs // local peer URLs
Expand Down

0 comments on commit e78ee83

Please sign in to comment.