Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rafthttp: configurable stream reader retry timeout #8003

Merged
merged 4 commits into from
Jun 2, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rafthttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/snap"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -188,6 +189,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
}
p.msgAppReader = &streamReader{
peerID: peerID,
Expand All @@ -197,7 +199,9 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
status: status,
recvc: p.recvc,
propc: p.propc,
rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
}

p.msgAppV2Reader.start()
p.msgAppReader.start()

Expand Down
59 changes: 32 additions & 27 deletions rafthttp/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"golang.org/x/time/rate"

"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/httputil"
"github.com/coreos/etcd/pkg/transport"
Expand Down Expand Up @@ -243,7 +245,9 @@ func (cw *streamWriter) closeUnlocked() bool {
if !cw.working {
return false
}
cw.closer.Close()
if err := cw.closer.Close(); err != nil {
plog.Errorf("Peer %s (writer) connection close error: %v", cw.peerID, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Peer/peer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
if len(cw.msgc) > 0 {
cw.r.ReportUnreachable(uint64(cw.peerID))
}
Expand Down Expand Up @@ -278,25 +282,28 @@ type streamReader struct {
recvc chan<- raftpb.Message
propc chan<- raftpb.Message

rl *rate.Limiter // alters the frequency of dial retrial attempts

errorc chan<- error

mu sync.Mutex
paused bool
cancel func()
closer io.Closer

stopc chan struct{}
done chan struct{}
ctx context.Context
cancel context.CancelFunc
done chan struct{}
}

func (r *streamReader) start() {
r.stopc = make(chan struct{})
r.done = make(chan struct{})
if r.errorc == nil {
r.errorc = r.tr.ErrorC
func (cr *streamReader) start() {
cr.done = make(chan struct{})
if cr.errorc == nil {
cr.errorc = cr.tr.ErrorC
}

go r.run()
if cr.ctx == nil {
cr.ctx, cr.cancel = context.WithCancel(context.Background())
}
go cr.run()
}

func (cr *streamReader) run() {
Expand All @@ -311,7 +318,7 @@ func (cr *streamReader) run() {
} else {
cr.status.activate()
plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
err := cr.decodeLoop(rc, t)
err = cr.decodeLoop(rc, t)
plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
switch {
// all data is read out
Expand All @@ -322,15 +329,16 @@ func (cr *streamReader) run() {
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
}
}
select {
// Wait 100ms to create a new stream, so it doesn't bring too much
// overhead when retry.
case <-time.After(100 * time.Millisecond):
case <-cr.stopc:
// Wait for a while before new dial attempt
err = cr.rl.Wait(cr.ctx)
if cr.ctx.Err() != nil {
plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
close(cr.done)
return
}
if err != nil {
plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
}
}
}

Expand All @@ -346,7 +354,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
plog.Panicf("unhandled stream type %s", t)
}
select {
case <-cr.stopc:
case <-cr.ctx.Done():
cr.mu.Unlock()
if err := rc.Close(); err != nil {
return err
Expand Down Expand Up @@ -401,11 +409,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
}

func (cr *streamReader) stop() {
close(cr.stopc)
cr.mu.Lock()
if cr.cancel != nil {
cr.cancel()
}
cr.cancel()
cr.close()
cr.mu.Unlock()
<-cr.done
Expand All @@ -429,13 +434,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {

setPeerURLsHeader(req, cr.tr.URLs)

ctx, cancel := context.WithCancel(context.Background())
req = req.WithContext(ctx)
req = req.WithContext(cr.ctx)

cr.mu.Lock()
cr.cancel = cancel
select {
case <-cr.stopc:
case <-cr.ctx.Done():
cr.mu.Unlock()
return nil, fmt.Errorf("stream reader is stopped")
default:
Expand Down Expand Up @@ -497,7 +500,9 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {

func (cr *streamReader) close() {
if cr.closer != nil {
cr.closer.Close()
if err := cr.closer.Close(); err != nil {
plog.Errorf("Peer %s (reader) connection close error: %v", cr.peerID, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Peer/peer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
cr.closer = nil
}
Expand Down
8 changes: 8 additions & 0 deletions rafthttp/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package rafthttp

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -25,6 +26,8 @@ import (
"testing"
"time"

"golang.org/x/time/rate"

"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/testutil"
"github.com/coreos/etcd/pkg/types"
Expand Down Expand Up @@ -113,6 +116,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
ctx: context.Background(),
}
sr.dial(tt)

Expand Down Expand Up @@ -167,6 +171,7 @@ func TestStreamReaderDialResult(t *testing.T) {
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
errorc: make(chan error, 1),
ctx: context.Background(),
}

_, err := sr.dial(streamTypeMessage)
Expand All @@ -192,6 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
errorc: make(chan error, 1),
typ: streamTypeMessage,
status: newPeerStatus(types.ID(2)),
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
tr.onResp = func() {
// stop() waits for the run() goroutine to exit, but that exit
Expand Down Expand Up @@ -246,6 +252,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
ctx: context.Background(),
}

_, err := sr.dial(typ)
Expand Down Expand Up @@ -311,6 +318,7 @@ func TestStream(t *testing.T) {
status: newPeerStatus(types.ID(2)),
recvc: recvc,
propc: propc,
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
}
sr.start()

Expand Down
16 changes: 14 additions & 2 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/coreos/pkg/capnslog"
"github.com/xiang90/probing"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)

var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
Expand Down Expand Up @@ -94,8 +95,12 @@ type Transporter interface {
// User needs to call Start before calling other functions, and call
// Stop when the Transport is no longer used.
type Transport struct {
DialTimeout time.Duration // maximum duration before timing out dial of the request
TLSInfo transport.TLSInfo // TLS information used when creating connection
DialTimeout time.Duration // maximum duration before timing out dial of the request
// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
// a distinct rate limiter is created per every peer (default value: 10 events/sec)
DialRetryFrequency rate.Limit

TLSInfo transport.TLSInfo // TLS information used when creating connection

ID types.ID // local member ID
URLs types.URLs // local peer URLs
Expand Down Expand Up @@ -135,6 +140,13 @@ func (t *Transport) Start() error {
t.remotes = make(map[types.ID]*remote)
t.peers = make(map[types.ID]Peer)
t.prober = probing.NewProber(t.pipelineRt)

// If client didn't provide dial retry frequence, use the default
// (100ms backoff between attempts to create a new stream),
// so it doesn't bring too much overhead when retry.
if t.DialRetryFrequency == 0 {
t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
}
return nil
}

Expand Down