Skip to content

Commit

Permalink
rafthttp: configurable stream reader retry timeout
Browse files Browse the repository at this point in the history
 Dial timeout was hardcoded to 100ms which may bee too frequent for some use-cases
  • Loading branch information
Vitaly Isaev committed May 30, 2017
1 parent e42d517 commit eb51113
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 17 deletions.
30 changes: 16 additions & 14 deletions rafthttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,24 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
}()

p.msgAppV2Reader = &streamReader{
peerID: peerID,
typ: streamTypeMsgAppV2,
tr: transport,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
peerID: peerID,
typ: streamTypeMsgAppV2,
tr: transport,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
dialRetryTimeout: transport.StreamDialRetryTimeout,
}
p.msgAppReader = &streamReader{
peerID: peerID,
typ: streamTypeMessage,
tr: transport,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
peerID: peerID,
typ: streamTypeMessage,
tr: transport,
picker: picker,
status: status,
recvc: p.recvc,
propc: p.propc,
dialRetryTimeout: transport.StreamDialRetryTimeout,
}
p.msgAppV2Reader.start()
p.msgAppReader.start()
Expand Down
13 changes: 10 additions & 3 deletions rafthttp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ type streamReader struct {
recvc chan<- raftpb.Message
propc chan<- raftpb.Message

dialRetryTimeout time.Duration // dial retry timeout

errorc chan<- error

mu sync.Mutex
Expand All @@ -296,6 +298,13 @@ func (r *streamReader) start() {
r.errorc = r.tr.ErrorC
}

if r.dialRetryTimeout == 0 {
// If client didn't provide timeout value,
// wait 100ms to create a new stream, so it doesn't bring too much
// overhead when retry.
r.dialRetryTimeout = 100 * time.Millisecond
}

go r.run()
}

Expand Down Expand Up @@ -323,9 +332,7 @@ func (cr *streamReader) run() {
}
}
select {
// Wait 100ms to create a new stream, so it doesn't bring too much
// overhead when retry.
case <-time.After(100 * time.Millisecond):
case <-time.After(cr.dialRetryTimeout):
case <-cr.stopc:
plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
close(cr.done)
Expand Down
3 changes: 3 additions & 0 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ type Transport struct {
// When an error is received from ErrorC, user should stop raft state
// machine and thus stop the Transport.
ErrorC chan error
// StreamRetryTimeout is used to alter the frequency of streamReader
// dial retrial attempts
StreamDialRetryTimeout time.Duration

streamRt http.RoundTripper // roundTripper used by streams
pipelineRt http.RoundTripper // roundTripper used by pipelines
Expand Down

0 comments on commit eb51113

Please sign in to comment.