Skip to content

Commit

Permalink
feat: update to go-data-transfer v1.2.9 (#508)
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Mar 22, 2021
1 parent 46f73ec commit 18828bc
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 8 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/filecoin-project/go-address v0.0.3
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
github.com/filecoin-project/go-data-transfer v1.2.7
github.com/filecoin-project/go-data-transfer v1.2.9
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
Expand All @@ -22,7 +22,7 @@ require (
github.com/ipfs/go-blockservice v0.1.4-0.20200624145336-a978cec6e834
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-graphsync v0.5.2
github.com/ipfs/go-graphsync v0.6.0
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer v1.0.1 h1:5sYKDbstyDsdJpVP4UGUW6+BgCNfgnH8hQgf0E3ZAno=
github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo=
github.com/filecoin-project/go-data-transfer v1.2.7 h1:WE5Cpp9eMt5BDoWOVR64QegSn6bwHQaDzyyjVU377Y0=
github.com/filecoin-project/go-data-transfer v1.2.7/go.mod h1:mvjZ+C3NkBX10JP4JMu27DCjUouHFjHwUGh+Xc4yvDA=
github.com/filecoin-project/go-data-transfer v1.2.9 h1:k6oXrI/6AfNdEfUVFAx6LbVMGNmdWWq97BiaDnhS3JE=
github.com/filecoin-project/go-data-transfer v1.2.9/go.mod h1:ps/AU2Ok4pf2oFevi5RmMr3JRAWcnSTA/Zag6VMygIc=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1:GxJzR3oRIMTPtpZ0b7QF8FKPK6/iPAc7trhlL5k/g+s=
Expand Down Expand Up @@ -304,8 +304,8 @@ github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3z
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-graphsync v0.4.3 h1:2t+oCpufufs1oqChoWiIK7V5uC1XCtf06PK9nqMV6pM=
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.5.2 h1:USD+daaSC+7pLHCxROThSaF6SF7WYXF03sjrta0rCfA=
github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-graphsync v0.6.0 h1:x6UvDUGA7wjaKNqx5Vbo7FGT8aJ5ryYA0dMQ5jN3dF0=
github.com/ipfs/go-graphsync v0.6.0/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/impl/clientstates/client_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var ClientEvents = fsm.Events{
FromMany(storagemarket.StorageDealStartDataTransfer, storagemarket.StorageDealTransferring).
To(storagemarket.StorageDealFailing).
Action(func(deal *storagemarket.ClientDeal, err error) error {
deal.Message = xerrors.Errorf("failed to initiate data transfer: %w", err).Error()
deal.Message = xerrors.Errorf("failed to complete data transfer: %w", err).Error()
return nil
}),

Expand Down
133 changes: 132 additions & 1 deletion storagemarket/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-commp-utils/pieceio"
"github.com/filecoin-project/go-commp-utils/pieceio/cario"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channelmonitor"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-state-types/abi"
Expand Down Expand Up @@ -298,7 +299,15 @@ func TestRestartOnlyProviderDataTransfer(t *testing.T) {
td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry)

// Configure data-transfer to restart after stalling
restartConf := dtimpl.PushChannelRestartConfig(100*time.Millisecond, 10, 1, 200*time.Millisecond, 5)
restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{
AcceptTimeout: 200 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesTransferred: 1,
ChecksPerInterval: 10,
RestartBackoff: 200 * time.Millisecond,
MaxConsecutiveRestarts: 5,
CompleteTimeout: 200 * time.Millisecond,
})
smState := testnodes.NewStorageMarketState()
depGen := dependencies.NewDepGenerator()
depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport, counter *storedcounter.StoredCounter) (datatransfer.Manager, error) {
Expand Down Expand Up @@ -613,6 +622,128 @@ func TestRestartClient(t *testing.T) {
}
}

// TestBounceConnectionDataTransfer tests that when the the connection is
// broken and then restarted, the data transfer will resume and the deal will
// complete successfully.
func TestBounceConnectionDataTransfer(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

// Configure data-transfer to make 5 attempts, backing off 1s each time
dtClientNetRetry := dtnet.RetryParameters(time.Second, time.Second, 5, 1)
td := shared_testutil.NewLibp2pTestData(ctx, t)
td.DTNet1 = dtnet.NewFromLibp2pHost(td.Host1, dtClientNetRetry)

// Configure data-transfer to automatically restart when connection goes down
restartConf := dtimpl.ChannelRestartConfig(channelmonitor.Config{
AcceptTimeout: 100 * time.Millisecond,
Interval: 100 * time.Millisecond,
MinBytesTransferred: 1,
ChecksPerInterval: 10,
RestartBackoff: 200 * time.Millisecond,
MaxConsecutiveRestarts: 5,
CompleteTimeout: 100 * time.Millisecond,
})
smState := testnodes.NewStorageMarketState()
depGen := dependencies.NewDepGenerator()
depGen.ClientNewDataTransfer = func(ds datastore.Batching, dir string, transferNetwork dtnet.DataTransferNetwork, transport datatransfer.Transport, counter *storedcounter.StoredCounter) (datatransfer.Manager, error) {
return dtimpl.NewDataTransfer(ds, dir, transferNetwork, transport, counter, restartConf)
}
deps := depGen.New(t, ctx, td, smState, "", noOpDelay, noOpDelay)
h := testharness.NewHarnessWithTestData(t, td, deps, true, false)

client := h.Client
clientHost := h.TestData.Host1.ID()
providerHost := h.TestData.Host2.ID()

// start client and provider
shared_testutil.StartAndWaitForReady(ctx, t, h.Provider)
shared_testutil.StartAndWaitForReady(ctx, t, h.Client)

// set ask price where we'll accept any price
err := h.Provider.SetAsk(big.NewInt(0), big.NewInt(0), 50000)
require.NoError(t, err)

// Bounce connection after this many bytes have been queued for sending
bounceConnectionAt := map[uint64]bool{
1000: false,
5000: false,
}
h.DTClient.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
//t.Logf("dt-clnt %s: %s %s\n", datatransfer.Events[event.Code], datatransfer.Statuses[channelState.Status()], channelState.Message())
if event.Code == datatransfer.DataQueuedProgress {
//t.Logf(" > qued %d", channelState.Queued())

// Check if enough bytes have been queued that the connection
// should be bounced
for at, already := range bounceConnectionAt {
if channelState.Sent() > at && !already {
bounceConnectionAt[at] = true

// Break the connection
queued := channelState.Queued()
t.Logf(" breaking connection after sending %d bytes", queued)
h.TestData.MockNet.DisconnectPeers(clientHost, providerHost)
h.TestData.MockNet.UnlinkPeers(clientHost, providerHost)

go func() {
// Restore the connection
time.Sleep(100 * time.Millisecond)
t.Logf(" restoring connection from bounce at %d bytes", queued)
h.TestData.MockNet.LinkPeers(clientHost, providerHost)
}()
}
}
}
//if event.Code == datatransfer.DataSentProgress {
// t.Logf(" > sent %d", channelState.Sent())
//}
})
//h.DTProvider.SubscribeToEvents(func(event datatransfer.Event, channelState datatransfer.ChannelState) {
// if event.Code == datatransfer.DataReceivedProgress {
// t.Logf(" > rcvd %d", channelState.Received())
// }
//})

result := h.ProposeStorageDeal(t, &storagemarket.DataRef{TransferType: storagemarket.TTGraphsync, Root: h.PayloadCid}, false, false)
proposalCid := result.ProposalCid
t.Log("storage deal proposed")

// This wait group will complete after the deal has completed on both the
// client and provider
expireWg := sync.WaitGroup{}
expireWg.Add(1)
_ = h.Provider.SubscribeToEvents(func(event storagemarket.ProviderEvent, deal storagemarket.MinerDeal) {
if event == storagemarket.ProviderEventDealExpired {
expireWg.Done()
}
})

expireWg.Add(1)
_ = client.SubscribeToEvents(func(event storagemarket.ClientEvent, deal storagemarket.ClientDeal) {
if event == storagemarket.ClientEventDealExpired {
expireWg.Done()
}
})

// Wait till both client and provider have completed the deal
waitGroupWait(ctx, &expireWg)
t.Log("---------- finished waiting for expected events-------")

// Ensure the client and provider both reached the final state
cd, err := client.GetLocalDeal(ctx, proposalCid)
require.NoError(t, err)
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, cd.State)

providerDeals, err := h.Provider.ListLocalDeals()
require.NoError(t, err)

pd := providerDeals[0]
require.Equal(t, pd.ProposalCid, proposalCid)
shared_testutil.AssertDealState(t, storagemarket.StorageDealExpired, pd.State)
}

// TestCancelDataTransfer tests that cancelling a data transfer cancels the deal
func TestCancelDataTransfer(t *testing.T) {
run := func(t *testing.T, cancelByClient bool, hasConnectivity bool) {
Expand Down

0 comments on commit 18828bc

Please sign in to comment.