From eb51113a26056068b7a6e9972c75e1fceb63fcfe Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Tue, 30 May 2017 17:48:13 +0300 Subject: [PATCH] rafthttp: configurable stream reader retry timeout Dial timeout was hardcoded to 100ms which may bee too frequent for some use-cases --- rafthttp/peer.go | 30 ++++++++++++++++-------------- rafthttp/stream.go | 13 ++++++++++--- rafthttp/transport.go | 3 +++ 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index a82d7beed74d..ba58d939ef32 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -181,22 +181,24 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats }() p.msgAppV2Reader = &streamReader{ - peerID: peerID, - typ: streamTypeMsgAppV2, - tr: transport, - picker: picker, - status: status, - recvc: p.recvc, - propc: p.propc, + peerID: peerID, + typ: streamTypeMsgAppV2, + tr: transport, + picker: picker, + status: status, + recvc: p.recvc, + propc: p.propc, + dialRetryTimeout: transport.StreamDialRetryTimeout, } p.msgAppReader = &streamReader{ - peerID: peerID, - typ: streamTypeMessage, - tr: transport, - picker: picker, - status: status, - recvc: p.recvc, - propc: p.propc, + peerID: peerID, + typ: streamTypeMessage, + tr: transport, + picker: picker, + status: status, + recvc: p.recvc, + propc: p.propc, + dialRetryTimeout: transport.StreamDialRetryTimeout, } p.msgAppV2Reader.start() p.msgAppReader.start() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 2a6c620f56dc..a7874ca5dcd6 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -278,6 +278,8 @@ type streamReader struct { recvc chan<- raftpb.Message propc chan<- raftpb.Message + dialRetryTimeout time.Duration // dial retry timeout + errorc chan<- error mu sync.Mutex @@ -296,6 +298,13 @@ func (r *streamReader) start() { r.errorc = r.tr.ErrorC } + if r.dialRetryTimeout == 0 { + // If client didn't provide timeout value, + // wait 100ms to create a new stream, so it doesn't bring too much + // overhead when retry. + r.dialRetryTimeout = 100 * time.Millisecond + } + go r.run() } @@ -323,9 +332,7 @@ func (cr *streamReader) run() { } } select { - // Wait 100ms to create a new stream, so it doesn't bring too much - // overhead when retry. - case <-time.After(100 * time.Millisecond): + case <-time.After(cr.dialRetryTimeout): case <-cr.stopc: plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) close(cr.done) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 1f0b46836e66..b750681e42ca 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -111,6 +111,9 @@ type Transport struct { // When an error is received from ErrorC, user should stop raft state // machine and thus stop the Transport. ErrorC chan error + // StreamRetryTimeout is used to alter the frequency of streamReader + // dial retrial attempts + StreamDialRetryTimeout time.Duration streamRt http.RoundTripper // roundTripper used by streams pipelineRt http.RoundTripper // roundTripper used by pipelines