From bd18840fd6c5a84524bdedf91764a26f3d70bbf5 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Wed, 31 May 2017 12:25:22 +0300 Subject: [PATCH] rafthttp: configurable stream reader retry timeout rafthttp.Transport.DialRetryTimeout field alters the frequency of dial attempts --- rafthttp/peer.go | 8 ++++++++ rafthttp/stream.go | 31 ++++++++++++++++++++++--------- rafthttp/stream_test.go | 6 ++++++ rafthttp/transport.go | 6 ++++-- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index a82d7beed74..394998514ec 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "golang.org/x/net/context" + "golang.org/x/time/rate" ) const ( @@ -198,6 +199,13 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats recvc: p.recvc, propc: p.propc, } + + if transport.DialRetryTimeout != 0 { + limit := rate.Every(transport.DialRetryTimeout) + p.msgAppV2Reader.rl = rate.NewLimiter(limit, 1) + p.msgAppReader.rl = rate.NewLimiter(limit, 1) + } + p.msgAppV2Reader.start() p.msgAppReader.start() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 2a6c620f56d..2f53562d05e 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "golang.org/x/time/rate" + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/transport" @@ -278,6 +280,8 @@ type streamReader struct { recvc chan<- raftpb.Message propc chan<- raftpb.Message + rl *rate.Limiter // alters the frequency of dial retrial attempts + errorc chan<- error mu sync.Mutex @@ -289,14 +293,21 @@ type streamReader struct { done chan struct{} } -func (r *streamReader) start() { - r.stopc = make(chan struct{}) - r.done = make(chan struct{}) - if r.errorc == nil { - r.errorc = r.tr.ErrorC +func (cr *streamReader) start() { + cr.stopc = make(chan struct{}) + cr.done = make(chan struct{}) + if cr.errorc == nil { + cr.errorc = cr.tr.ErrorC + } + + if cr.rl == nil { + // If client didn't provide rate limiter, use the default which will + // wait 100ms to create a new stream, so it doesn't bring too much + // overhead when retry. + cr.rl = rate.NewLimiter(rate.Every(100*time.Millisecond), 1) } - go r.run() + go cr.run() } func (cr *streamReader) run() { @@ -323,13 +334,15 @@ func (cr *streamReader) run() { } } select { - // Wait 100ms to create a new stream, so it doesn't bring too much - // overhead when retry. - case <-time.After(100 * time.Millisecond): case <-cr.stopc: plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) close(cr.done) return + default: + // wait for a while before new dial attempt + if err := cr.rl.Wait(context.TODO()); err != nil { + plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) + } } } } diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index f48714e7c57..a8985d86b85 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "golang.org/x/time/rate" + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" @@ -113,6 +115,7 @@ func TestStreamReaderDialRequest(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } sr.dial(tt) @@ -167,6 +170,7 @@ func TestStreamReaderDialResult(t *testing.T) { tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), errorc: make(chan error, 1), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } _, err := sr.dial(streamTypeMessage) @@ -192,6 +196,7 @@ func TestStreamReaderStopOnDial(t *testing.T) { errorc: make(chan error, 1), typ: streamTypeMessage, status: newPeerStatus(types.ID(2)), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } tr.onResp = func() { // stop() waits for the run() goroutine to exit, but that exit @@ -246,6 +251,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } _, err := sr.dial(typ) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 1f0b46836e6..a33db3c08cd 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -94,8 +94,10 @@ type Transporter interface { // User needs to call Start before calling other functions, and call // Stop when the Transport is no longer used. type Transport struct { - DialTimeout time.Duration // maximum duration before timing out dial of the request - TLSInfo transport.TLSInfo // TLS information used when creating connection + DialTimeout time.Duration // maximum duration before timing out dial of the request + DialRetryTimeout time.Duration // alters the frequency of streamReader dial retrial attempts + + TLSInfo transport.TLSInfo // TLS information used when creating connection ID types.ID // local member ID URLs types.URLs // local peer URLs