From f71db14ad72b0084ab0501653215955822dc298c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 12 Oct 2021 21:00:07 -0700 Subject: [PATCH 1/3] feat(graphsync): configure message parameters allow setting custom values for timing out messages and number of retries --- benchmarks/testnet/virtual.go | 2 +- impl/graphsync.go | 30 ++++++++++++++++++++- messagequeue/messagequeue.go | 44 ++++++++++++++++--------------- messagequeue/messagequeue_test.go | 17 +++++++----- network/interface.go | 8 +++++- network/libp2p_impl.go | 23 +++++++++++----- 6 files changed, 86 insertions(+), 38 deletions(-) diff --git a/benchmarks/testnet/virtual.go b/benchmarks/testnet/virtual.go index 656482bd..024f4dc9 100644 --- a/benchmarks/testnet/virtual.go +++ b/benchmarks/testnet/virtual.go @@ -200,7 +200,7 @@ func (mp *messagePasser) Reset() error { return nil } -func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (gsnet.MessageSender, error) { +func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, _ gsnet.MessageSenderOpts) (gsnet.MessageSender, error) { return &messagePasser{ net: nc, target: p, diff --git a/impl/graphsync.go b/impl/graphsync.go index 8acb7dc6..ca609b1c 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -2,6 +2,7 @@ package graphsync import ( "context" + "time" logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-peertaskqueue" @@ -33,6 +34,8 @@ const maxRecursionDepth = 100 const defaultTotalMaxMemory = uint64(256 << 20) const defaultMaxMemoryPerPeer = uint64(16 << 20) const defaultMaxInProgressRequests = uint64(6) +const defaultMessageSendRetries = 10 +const defaultSendMessageTimeout = 10 * time.Minute // GraphSync is an instance of a GraphSync exchange that implements // the graphsync protocol. @@ -77,6 +80,8 @@ type graphsyncConfigOptions struct { registerDefaultValidator bool maxLinksPerOutgoingRequest uint64 maxLinksPerIncomingRequest uint64 + messageSendRetries int + sendMessageTimeout time.Duration } // Option defines the functional option type that can be used to configure @@ -171,6 +176,27 @@ func MaxLinksPerIncomingRequests(maxLinksPerIncomingRequest uint64) Option { } } +// MessageSendRetries sets the number of times graphsync will send +// attempt to send a message before giving up. +// Lower to increase the speed at which an unresponsive peer is +// detected. +func MessageSendRetries(messageSendRetries int) Option { + return func(gs *graphsyncConfigOptions) { + gs.messageSendRetries = messageSendRetries + } +} + +// SendMessageTimeout sets the amount of time graphsync will wait +// for a message to go across the wire before giving up and +// trying again (up to max retries). +// Lower to increase the speed at which an unresponsive peer is +// detected. +func SendMessageTimeout(sendMessageTimeout time.Duration) Option { + return func(gs *graphsyncConfigOptions) { + gs.sendMessageTimeout = sendMessageTimeout + } +} + // New creates a new GraphSync Exchange on the given network, // and the given link loader+storer. func New(parent context.Context, network gsnet.GraphSyncNetwork, @@ -185,6 +211,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, maxInProgressIncomingRequests: defaultMaxInProgressRequests, maxInProgressOutgoingRequests: defaultMaxInProgressRequests, registerDefaultValidator: true, + messageSendRetries: defaultMessageSendRetries, + sendMessageTimeout: defaultSendMessageTimeout, } for _, option := range options { option(gsConfig) @@ -207,7 +235,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, } responseAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryResponder, gsConfig.maxMemoryPerPeerResponder) createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue { - return messagequeue.New(ctx, p, network, responseAllocator) + return messagequeue.New(ctx, p, network, responseAllocator, gsConfig.messageSendRetries, gsConfig.sendMessageTimeout) } peerManager := peermanager.NewMessageManager(ctx, createMessageQueue) requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index de7db2d1..bd7df40b 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -17,8 +17,6 @@ import ( var log = logging.Logger("graphsync") -const maxRetries = 10 - // max block size is the maximum size for batching blocks in a single payload const maxBlockSize uint64 = 512 * 1024 @@ -38,7 +36,7 @@ type Event struct { // MessageNetwork is any network that can connect peers and generate a message // sender. type MessageNetwork interface { - NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error) + NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error) ConnectTo(context.Context, peer.ID) error } @@ -58,24 +56,28 @@ type MessageQueue struct { done chan struct{} // internal do not touch outside go routines - sender gsnet.MessageSender - eventPublisher notifications.Publisher - buildersLk sync.RWMutex - builders []*gsmsg.Builder - nextBuilderTopic gsmsg.Topic - allocator Allocator + sender gsnet.MessageSender + eventPublisher notifications.Publisher + buildersLk sync.RWMutex + builders []*gsmsg.Builder + nextBuilderTopic gsmsg.Topic + allocator Allocator + maxRetries int + sendMessageTimeout time.Duration } // New creats a new MessageQueue. -func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator) *MessageQueue { +func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator, maxRetries int, sendMessageTimeout time.Duration) *MessageQueue { return &MessageQueue{ - ctx: ctx, - network: network, - p: p, - outgoingWork: make(chan struct{}, 1), - done: make(chan struct{}), - eventPublisher: notifications.NewPublisher(), - allocator: allocator, + ctx: ctx, + network: network, + p: p, + outgoingWork: make(chan struct{}, 1), + done: make(chan struct{}), + eventPublisher: notifications.NewPublisher(), + allocator: allocator, + maxRetries: maxRetries, + sendMessageTimeout: sendMessageTimeout, } } @@ -220,7 +222,7 @@ func (mq *MessageQueue) sendMessage() { return } - for i := 0; i < maxRetries; i++ { // try to send this message until we fail. + for i := 0; i < mq.maxRetries; i++ { // try to send this message until we fail. if mq.attemptSendAndRecovery(message, publisher) { return } @@ -232,7 +234,7 @@ func (mq *MessageQueue) initializeSender() error { if mq.sender != nil { return nil } - nsender, err := openSender(mq.ctx, mq.network, mq.p) + nsender, err := openSender(mq.ctx, mq.network, mq.p, mq.sendMessageTimeout) if err != nil { return err } @@ -277,7 +279,7 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage, p return false } -func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.MessageSender, error) { +func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTimeout time.Duration) (gsnet.MessageSender, error) { // allow ten minutes for connections this includes looking them up in the // dht dialing them, and handshaking conctx, cancel := context.WithTimeout(ctx, time.Minute*10) @@ -288,7 +290,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.M return nil, err } - nsender, err := network.NewMessageSender(ctx, p) + nsender, err := network.NewMessageSender(ctx, p, gsnet.MessageSenderOpts{SendTimeout: sendTimeout}) if err != nil { return nil, err } diff --git a/messagequeue/messagequeue_test.go b/messagequeue/messagequeue_test.go index 0425d936..18aa524d 100644 --- a/messagequeue/messagequeue_test.go +++ b/messagequeue/messagequeue_test.go @@ -21,6 +21,9 @@ import ( "github.com/ipfs/go-graphsync/testutil" ) +const sendMessageTimeout = 10 * time.Minute +const messageSendRetries = 10 + type fakeMessageNetwork struct { connectError error messageSenderError error @@ -32,7 +35,7 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error { return fmn.connectError } -func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error) { +func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error) { fmn.wait.Done() if fmn.messageSenderError == nil { return fmn.messageSender, nil @@ -68,7 +71,7 @@ func TestStartupAndShutdown(t *testing.T) { messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} allocator := allocator2.NewAllocator(1<<30, 1<<30) - messageQueue := New(ctx, peer, messageNetwork, allocator) + messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() id := graphsync.RequestID(rand.Int31()) priority := graphsync.Priority(rand.Int31()) @@ -106,7 +109,7 @@ func TestShutdownDuringMessageSend(t *testing.T) { messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} allocator := allocator2.NewAllocator(1<<30, 1<<30) - messageQueue := New(ctx, peer, messageNetwork, allocator) + messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() id := graphsync.RequestID(rand.Int31()) priority := graphsync.Priority(rand.Int31()) @@ -154,7 +157,7 @@ func TestProcessingNotification(t *testing.T) { messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} allocator := allocator2.NewAllocator(1<<30, 1<<30) - messageQueue := New(ctx, peer, messageNetwork, allocator) + messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() waitGroup.Add(1) blks := testutil.GenerateBlocksOfSize(3, 128) @@ -210,7 +213,7 @@ func TestDedupingMessages(t *testing.T) { messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} allocator := allocator2.NewAllocator(1<<30, 1<<30) - messageQueue := New(ctx, peer, messageNetwork, allocator) + messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() waitGroup.Add(1) id := graphsync.RequestID(rand.Int31()) @@ -282,7 +285,7 @@ func TestSendsVeryLargeBlocksResponses(t *testing.T) { messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup} allocator := allocator2.NewAllocator(1<<30, 1<<30) - messageQueue := New(ctx, peer, messageNetwork, allocator) + messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() waitGroup.Add(1) @@ -342,7 +345,7 @@ func TestSendsResponsesMemoryPressure(t *testing.T) { // use allocator with very small limit allocator := allocator2.NewAllocator(1000, 1000) - messageQueue := New(ctx, p, messageNetwork, allocator) + messageQueue := New(ctx, p, messageNetwork, allocator, messageSendRetries, sendMessageTimeout) messageQueue.Startup() waitGroup.Add(1) diff --git a/network/interface.go b/network/interface.go index caff448a..2277cc08 100644 --- a/network/interface.go +++ b/network/interface.go @@ -2,6 +2,7 @@ package network import ( "context" + "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" @@ -30,11 +31,16 @@ type GraphSyncNetwork interface { // ConnectTo establishes a connection to the given peer ConnectTo(context.Context, peer.ID) error - NewMessageSender(context.Context, peer.ID) (MessageSender, error) + NewMessageSender(context.Context, peer.ID, MessageSenderOpts) (MessageSender, error) ConnectionManager() ConnManager } +// MessageSenderOpts sets parameters for a message sender +type MessageSenderOpts struct { + SendTimeout time.Duration +} + // ConnManager provides the methods needed to protect and unprotect connections type ConnManager interface { Protect(peer.ID, string) diff --git a/network/libp2p_impl.go b/network/libp2p_impl.go index 985ccd7d..0b6a9a6c 100644 --- a/network/libp2p_impl.go +++ b/network/libp2p_impl.go @@ -38,7 +38,8 @@ type libp2pGraphSyncNetwork struct { } type streamMessageSender struct { - s network.Stream + s network.Stream + opts MessageSenderOpts } func (s *streamMessageSender) Close() error { @@ -50,14 +51,14 @@ func (s *streamMessageSender) Reset() error { } func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error { - return msgToStream(ctx, s.s, msg) + return msgToStream(ctx, s.s, msg, s.opts.SendTimeout) } -func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessage) error { +func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessage, timeout time.Duration) error { log.Debugf("Outgoing message with %d requests, %d responses, and %d blocks", len(msg.Requests()), len(msg.Responses()), len(msg.Blocks())) - deadline := time.Now().Add(sendMessageTimeout) + deadline := time.Now().Add(timeout) if dl, ok := ctx.Deadline(); ok { deadline = dl } @@ -81,13 +82,13 @@ func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessa return nil } -func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) { +func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID, opts MessageSenderOpts) (MessageSender, error) { s, err := gsnet.newStreamToPeer(ctx, p) if err != nil { return nil, err } - return &streamMessageSender{s: s}, nil + return &streamMessageSender{s: s, opts: setDefaults(opts)}, nil } func (gsnet *libp2pGraphSyncNetwork) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) { @@ -104,7 +105,7 @@ func (gsnet *libp2pGraphSyncNetwork) SendMessage( return err } - if err = msgToStream(ctx, s, outgoing); err != nil { + if err = msgToStream(ctx, s, outgoing, sendMessageTimeout); err != nil { _ = s.Reset() return err } @@ -173,3 +174,11 @@ func (nn *libp2pGraphSyncNotifee) OpenedStream(n network.Network, v network.Stre func (nn *libp2pGraphSyncNotifee) ClosedStream(n network.Network, v network.Stream) {} func (nn *libp2pGraphSyncNotifee) Listen(n network.Network, a ma.Multiaddr) {} func (nn *libp2pGraphSyncNotifee) ListenClose(n network.Network, a ma.Multiaddr) {} + +func setDefaults(opts MessageSenderOpts) MessageSenderOpts { + copy := opts + if opts.SendTimeout == 0 { + copy.SendTimeout = sendMessageTimeout + } + return copy +} From b858c29e549cba016976c391bd47d070780015e6 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 12 Oct 2021 22:07:37 -0700 Subject: [PATCH 2/3] Update impl/graphsync.go Co-authored-by: Rod Vagg --- impl/graphsync.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/impl/graphsync.go b/impl/graphsync.go index ca609b1c..ec5b2f4f 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -180,6 +180,8 @@ func MaxLinksPerIncomingRequests(maxLinksPerIncomingRequest uint64) Option { // attempt to send a message before giving up. // Lower to increase the speed at which an unresponsive peer is // detected. +// +// If not set, a default of 10 is used. func MessageSendRetries(messageSendRetries int) Option { return func(gs *graphsyncConfigOptions) { gs.messageSendRetries = messageSendRetries From d9d222ba3e8224d68f6d8d8bc9e51e2cab871467 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 12 Oct 2021 22:07:44 -0700 Subject: [PATCH 3/3] Update impl/graphsync.go Co-authored-by: Rod Vagg --- impl/graphsync.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/impl/graphsync.go b/impl/graphsync.go index ec5b2f4f..797202d8 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -193,6 +193,8 @@ func MessageSendRetries(messageSendRetries int) Option { // trying again (up to max retries). // Lower to increase the speed at which an unresponsive peer is // detected. +// +// If not set, a default of 10 minutes is used. func SendMessageTimeout(sendMessageTimeout time.Duration) Option { return func(gs *graphsyncConfigOptions) { gs.sendMessageTimeout = sendMessageTimeout