Skip to content

Commit

Permalink
Automatically restart push channel (#127)
Browse files Browse the repository at this point in the history
* feat: latest go-graphsync

* feat: auto-restart connection for push data channels

* refactor: simplify push channel monitor config

* fix: more granular interval checking of data rates

* refactor: simplify push channel monitor naming
  • Loading branch information
dirkmc authored Dec 16, 2020
1 parent b945240 commit 288413b
Show file tree
Hide file tree
Showing 7 changed files with 922 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.5.0
github.com/ipfs/go-graphsync v0.5.2
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
github.com/ipfs/go-graphsync v0.5.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA=
github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
74 changes: 59 additions & 15 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,30 @@ import (
"github.com/filecoin-project/go-data-transfer/encoding"
"github.com/filecoin-project/go-data-transfer/message"
"github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-data-transfer/pushchannelmonitor"
"github.com/filecoin-project/go-data-transfer/registry"
)

var log = logging.Logger("dt-impl")

type manager struct {
dataTransferNetwork network.DataTransferNetwork
validatedTypes *registry.Registry
resultTypes *registry.Registry
revalidators *registry.Registry
transportConfigurers *registry.Registry
pubSub *pubsub.PubSub
readySub *pubsub.PubSub
channels *channels.Channels
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
channelRemoveTimeout time.Duration
reconnectsLk sync.RWMutex
reconnects map[datatransfer.ChannelID]chan struct{}
cidLists cidlists.CIDLists
dataTransferNetwork network.DataTransferNetwork
validatedTypes *registry.Registry
resultTypes *registry.Registry
revalidators *registry.Registry
transportConfigurers *registry.Registry
pubSub *pubsub.PubSub
readySub *pubsub.PubSub
channels *channels.Channels
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
channelRemoveTimeout time.Duration
reconnectsLk sync.RWMutex
reconnects map[datatransfer.ChannelID]chan struct{}
cidLists cidlists.CIDLists
pushChannelMonitor *pushchannelmonitor.Monitor
pushChannelMonitorCfg *pushchannelmonitor.Config
}

type internalEvent struct {
Expand Down Expand Up @@ -88,6 +91,28 @@ func ChannelRemoveTimeout(timeout time.Duration) DataTransferOption {
}
}

// PushChannelRestartConfig sets the configuration options for automatically
// restarting push channels
// - interval is the time over which minBytesSent must have been sent
// - checksPerInterval is the number of times to check per interval
// - minBytesSent is the minimum amount of data that must have been sent over the interval
// - restartBackoff is the time to wait before checking again for restarts
func PushChannelRestartConfig(
interval time.Duration,
checksPerInterval uint32,
minBytesSent uint64,
restartBackoff time.Duration,
) DataTransferOption {
return func(m *manager) {
m.pushChannelMonitorCfg = &pushchannelmonitor.Config{
Interval: interval,
ChecksPerInterval: checksPerInterval,
MinBytesSent: minBytesSent,
RestartBackoff: restartBackoff,
}
}
}

const defaultChannelRemoveTimeout = 1 * time.Hour

// NewDataTransfer initializes a new instance of a data transfer manager
Expand All @@ -106,6 +131,7 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
channelRemoveTimeout: defaultChannelRemoveTimeout,
reconnects: make(map[datatransfer.ChannelID]chan struct{}),
}

cidLists, err := cidlists.NewCIDLists(cidListsDir)
if err != nil {
return nil, err
Expand All @@ -116,9 +142,17 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
return nil, err
}
m.channels = channels

// Apply config options
for _, option := range options {
option(m)
}

// Start push channel monitor after applying config options as the config
// options may apply to the monitor
m.pushChannelMonitor = pushchannelmonitor.NewMonitor(m, m.pushChannelMonitorCfg)
m.pushChannelMonitor.Start()

return m, nil
}

Expand Down Expand Up @@ -161,6 +195,7 @@ func (m *manager) OnReady(ready datatransfer.ReadyFunc) {

// Stop terminates all data transfers and ends processing
func (m *manager) Stop(ctx context.Context) error {
m.pushChannelMonitor.Shutdown()
return m.transport.Shutdown(ctx)
}

Expand Down Expand Up @@ -196,11 +231,20 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(requestTo, chid.String())
monitoredChan := m.pushChannelMonitor.AddChannel(chid)
if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil {
err = fmt.Errorf("Unable to send request: %w", err)
_ = m.channels.Error(chid, err)

// If push channel monitoring is enabled, shutdown the monitor as it
// wasn't possible to start the data transfer
if monitoredChan != nil {
monitoredChan.Shutdown()
}

return chid, err
}

return chid, nil
}

Expand Down
107 changes: 101 additions & 6 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestRoundTrip(t *testing.T) {
for opens < 2 || completes < 2 || len(sentIncrements) < 21 || len(receivedIncrements) < 21 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) {
for opens < 2*data.requestCount || completes < 2*data.requestCount {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
for opens < 2*data.receiverCount || completes < 2*data.receiverCount {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand All @@ -497,6 +497,101 @@ func TestManyReceiversAtOnce(t *testing.T) {
}
}

// TestPushRequestAutoRestart tests that if the connection for a push request
// goes down, it will automatically restart (given the right config options)
func TestPushRequestAutoRestart(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

gsData := testutil.NewGraphsyncTestingData(ctx, t, nil, nil)
netRetry := network.RetryParameters(time.Second, time.Second, 5, 1)
gsData.DtNet1 = network.NewFromLibp2pHost(gsData.Host1, netRetry)
host1 := gsData.Host1 // initiator, data sender
host2 := gsData.Host2 // data recipient

tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

restartConf := PushChannelRestartConfig(100*time.Millisecond, 1, 10, 200*time.Millisecond)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

received := make(chan struct{})
finished := make(chan struct{}, 2)
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
//t.Logf("%s: %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()])

if event.Code == datatransfer.DataReceived {
received <- struct{}{}
}

if channelState.Status() == datatransfer.Completed {
finished <- struct{}{}
}
}
dt1.SubscribeToEvents(subscriber)
dt2.SubscribeToEvents(subscriber)
voucher := testutil.FakeDTType{Data: "applesauce"}
sv := testutil.NewStubbedValidator()

sourceDagService := gsData.DagService1
destDagService := gsData.DagService2

root, origBytes := testutil.LoadUnixFSFile(ctx, t, sourceDagService, loremFile)
rootCid := root.(cidlink.Link).Cid

require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
chid, err := dt1.OpenPushDataChannel(ctx, host2.ID(), &voucher, rootCid, gsData.AllSelector)
require.NoError(t, err)

// Wait for a block to be received
<-received

// Break connection
t.Logf("Breaking connection to peer")
require.NoError(t, gsData.Mn.UnlinkPeers(host1.ID(), host2.ID()))
require.NoError(t, gsData.Mn.DisconnectPeers(host1.ID(), host2.ID()))

t.Logf("Sleep for a second")
time.Sleep(1 * time.Second)

// Restore connection
t.Logf("Restore connection")
require.NoError(t, gsData.Mn.LinkAll())
time.Sleep(200 * time.Millisecond)
conn, err := gsData.Mn.ConnectPeers(host1.ID(), host2.ID())
require.NoError(t, err)
require.NotNil(t, conn)

t.Logf("Waiting for auto-restart on push channel %s", chid)

(func() {
finishedCount := 0
for {
select {
case <-ctx.Done():
t.Fatal("Did not complete successful data transfer")
return
case <-received:
case <-finished:
finishedCount++
if finishedCount == 2 {
return
}
}
}
})()

// Verify that the file was transferred to the destination node
testutil.VerifyHasFile(ctx, t, destDagService, root, origBytes)
}

func TestRoundTripCancelledRequest(t *testing.T) {
ctx := context.Background()
testCases := map[string]struct {
Expand Down Expand Up @@ -751,7 +846,7 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
for providerFinished != nil || clientFinished != nil {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-providerFinished:
providerFinished = nil
case <-clientFinished:
Expand Down Expand Up @@ -868,7 +963,7 @@ func TestPauseAndResume(t *testing.T) {
pauseInitiators < 1 || pauseResponders < 1 || resumeInitiators < 1 || resumeResponders < 1 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
completes++
case <-opened:
Expand Down Expand Up @@ -968,7 +1063,7 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
for opens < 1 || finishes < 1 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
t.Fatal("Did not complete successful data transfer")
case <-finished:
finishes++
case <-opened:
Expand Down
6 changes: 3 additions & 3 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (impl *libp2pDataTransferNetwork) openStream(ctx context.Context, id peer.I
}

d := b.Duration()
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %w",
log.Warnf("failed to open stream to %s on attempt %g of %g, waiting %s to try again, err: %s",
id, nAttempts, impl.maxStreamOpenAttempts, d, err)

select {
Expand Down Expand Up @@ -183,14 +183,14 @@ func (dtnet *libp2pDataTransferNetwork) handleNewStream(s network.Stream) {
if err != io.EOF {
s.Reset() // nolint: errcheck,gosec
go dtnet.receiver.ReceiveError(err)
log.Debugf("graphsync net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
log.Debugf("net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
return
}

p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("graphsync net handleNewStream from %s", s.Conn().RemotePeer())
log.Debugf("net handleNewStream from %s", s.Conn().RemotePeer())

if received.IsRequest() {
receivedRequest, ok := received.(datatransfer.Request)
Expand Down
Loading

0 comments on commit 288413b

Please sign in to comment.