From f6902ca769b289a7b4d988219dea006f6f4ba25c Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Mon, 8 Mar 2021 16:11:03 +0100 Subject: [PATCH] test: add TestBounceConnectionDataTransfer to verify that when the connection goes down then is restored during a transfer, the deal completes successfully --- go.mod | 4 +- go.sum | 8 +- storagemarket/impl/clientstates/client_fsm.go | 2 +- storagemarket/integration_test.go | 134 +++++++++++++++++- 4 files changed, 140 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index 04f17268d..d9d9a4d18 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/filecoin-project/go-address v0.0.3 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434 - github.com/filecoin-project/go-data-transfer v1.2.7 + github.com/filecoin-project/go-data-transfer v1.2.9 github.com/filecoin-project/go-ds-versioning v0.1.0 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 @@ -22,7 +22,7 @@ require ( github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 - github.com/ipfs/go-graphsync v0.5.2 + github.com/ipfs/go-graphsync v0.6.0 github.com/ipfs/go-ipfs-blockstore v1.0.3 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index b036ad4f2..4c10ee3fa 100644 --- a/go.sum +++ b/go.sum @@ -111,8 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-data-transfer v1.0.1 h1:5sYKDbstyDsdJpVP4UGUW6+BgCNfgnH8hQgf0E3ZAno= github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo= -github.com/filecoin-project/go-data-transfer v1.2.7 h1:WE5Cpp9eMt5BDoWOVR64QegSn6bwHQaDzyyjVU377Y0= -github.com/filecoin-project/go-data-transfer v1.2.7/go.mod h1:mvjZ+C3NkBX10JP4JMu27DCjUouHFjHwUGh+Xc4yvDA= +github.com/filecoin-project/go-data-transfer v1.2.9 h1:k6oXrI/6AfNdEfUVFAx6LbVMGNmdWWq97BiaDnhS3JE= +github.com/filecoin-project/go-data-transfer v1.2.9/go.mod h1:ps/AU2Ok4pf2oFevi5RmMr3JRAWcnSTA/Zag6VMygIc= github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ= github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s= github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s= @@ -304,8 +304,8 @@ github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3z github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0= github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6pM= github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= -github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA= -github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= +github.com/ipfs/go-graphsync v0.6.0 h1:x6UvDUGA7wjaKNqx5Vbo7FGT8aJ5ryYA0dMQ5jN3dF0= +github.com/ipfs/go-graphsync v0.6.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= diff --git a/storagemarket/impl/clientstates/client_fsm.go b/storagemarket/impl/clientstates/client_fsm.go index d124bf0a2..f7ed90d9b 100644 --- a/storagemarket/impl/clientstates/client_fsm.go +++ b/storagemarket/impl/clientstates/client_fsm.go @@ -79,7 +79,7 @@ var ClientEvents = fsm.Events{ FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring). To(storagemarket.StorageDealFailing). Action(func(deal *storagemarket.ClientDeal, err error) error { - deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error() + deal.Message = xerrors.Errorf("failed to complete data transfer: %w", err).Error() return nil }), diff --git a/storagemarket/integration_test.go b/storagemarket/integration_test.go index 70de7a17c..02b38e894 100644 --- a/storagemarket/integration_test.go +++ b/storagemarket/integration_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + "github.com/filecoin-project/go-data-transfer/channelmonitor" + "github.com/ipfs/go-datastore" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -298,7 +300,15 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) { td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry) // Configure data-transfer to restart after stalling - restartConf := dtimpl.PushChannelRestartConfig(100*time.Millisecond, 10, 1, 200*time.Millisecond, 5) + restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{ + AcceptTimeout: 100 * time.Millisecond, + Interval: 100 * time.Millisecond, + MinBytesTransferred: 1, + ChecksPerInterval: 10, + RestartBackoff: 200 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + }) smState := testnodes.NewStorageMarketState() depGen := dependencies.NewDepGenerator() depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport, counter *storedcounter.StoredCounter) (datatransfer.Manager, error) { @@ -613,6 +623,128 @@ func TestRestartClient(t *testing.T) { } } +// TestBounceConnectionDataTransfer tests that when the the connection is +// broken and then restarted, the data transfer will resume and the deal will +// complete successfully. +func TestBounceConnectionDataTransfer(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + // Configure data-transfer to make 5 attempts, backing off 1s each time + dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1) + td := shared_testutil.NewLibp2pTestData(ctx, t) + td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry) + + // Configure data-transfer to automatically restart when connection goes down + restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{ + AcceptTimeout: 100 * time.Millisecond, + Interval: 100 * time.Millisecond, + MinBytesTransferred: 1, + ChecksPerInterval: 10, + RestartBackoff: 200 * time.Millisecond, + MaxConsecutiveRestarts: 5, + CompleteTimeout: 100 * time.Millisecond, + }) + smState := testnodes.NewStorageMarketState() + depGen := dependencies.NewDepGenerator() + depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport, counter *storedcounter.StoredCounter) (datatransfer.Manager, error) { + return dtimpl.NewDataTransfer(ds, dir, transferNetwork, transport, counter, restartConf) + } + deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay) + h := testharness.NewHarnessWithTestData(t, td, deps, true, false) + + client := h.Client + clientHost := h.TestData.Host1.ID() + providerHost := h.TestData.Host2.ID() + + // start client and provider + shared_testutil.StartAndWaitForReady(ctx, t, h.Provider) + shared_testutil.StartAndWaitForReady(ctx, t, h.Client) + + // set ask price where we'll accept any price + err := h.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000) + require.NoError(t, err) + + // Bounce connection after this many bytes have been queued for sending + bounceConnectionAt := map[uint64]bool{ + 1000: false, + 5000: false, + } + h.DTClient.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + //t.Logf("dt-clnt %s: %s %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()], channelState.Message()) + if event.Code == datatransfer.DataQueuedProgress { + //t.Logf(" > qued %d", channelState.Queued()) + + // Check if enough bytes have been queued that the connection + // should be bounced + for at, already := range bounceConnectionAt { + if channelState.Sent() > at && !already { + bounceConnectionAt[at] = true + + // Break the connection + queued := channelState.Queued() + t.Logf(" breaking connection after sending %d bytes", queued) + h.TestData.MockNet.DisconnectPeers(clientHost, providerHost) + h.TestData.MockNet.UnlinkPeers(clientHost, providerHost) + + go func() { + // Restore the connection + time.Sleep(100 * time.Millisecond) + t.Logf(" restoring connection from bounce at %d bytes", queued) + h.TestData.MockNet.LinkPeers(clientHost, providerHost) + }() + } + } + } + //if event.Code == datatransfer.DataSentProgress { + // t.Logf(" > sent %d", channelState.Sent()) + //} + }) + //h.DTProvider.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) { + // if event.Code == datatransfer.DataReceivedProgress { + // t.Logf(" > rcvd %d", channelState.Received()) + // } + //}) + + result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}, false, false) + proposalCid := result.ProposalCid + t.Log("storage deal proposed") + + // This wait group will complete after the deal has completed on both the + // client and provider + expireWg := sync.WaitGroup{} + expireWg.Add(1) + _ = h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) { + if event == storagemarket.ProviderEventDealExpired { + expireWg.Done() + } + }) + + expireWg.Add(1) + _ = client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) { + if event == storagemarket.ClientEventDealExpired { + expireWg.Done() + } + }) + + // Wait till both client and provider have completed the deal + waitGroupWait(ctx, &expireWg) + t.Log("---------- finished waiting for expected events-------") + + // Ensure the client and provider both reached the final state + cd, err := client.GetLocalDeal(ctx, proposalCid) + require.NoError(t, err) + shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, cd.State) + + providerDeals, err := h.Provider.ListLocalDeals() + require.NoError(t, err) + + pd := providerDeals[0] + require.Equal(t, pd.ProposalCid, proposalCid) + shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State) +} + // TestCancelDataTransfer tests that cancelling a data transfer cancels the deal func TestCancelDataTransfer(t *testing.T) { run := func(t *testing.T, cancelByClient bool, hasConnectivity bool) {