Skip to content

Commit

Permalink
feat: use random number instead of incrementing counter for transfer …
Browse files Browse the repository at this point in the history
…ID (#169)
  • Loading branch information
dirkmc authored Mar 23, 2021
1 parent bd97d64 commit 42e954d
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 71 deletions.
5 changes: 1 addition & 4 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
tn "github.com/filecoin-project/go-data-transfer/benchmarks/testnet"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
Expand Down Expand Up @@ -170,8 +168,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
storer := storeutil.StorerForBlockstore(bstore)
gs := gsimpl.New(ctx, gsNet, loader, storer, gsimpl.RejectAllRequestsByDefault())
transport := gstransport.NewTransport(p, gs)
dtCounter := storedcounter.New(dstore, datastore.NewKey("/data-transfers/counter"))
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport, dtCounter)
dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport)
if err != nil {
return Instance{}, err
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.14
require (
github.com/filecoin-project/go-ds-versioning v0.1.0
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe h
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
Expand Down
8 changes: 3 additions & 5 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channelmonitor"
"github.com/filecoin-project/go-data-transfer/channels"
Expand All @@ -39,10 +37,10 @@ type manager struct {
channels *channels.Channels
peerID peer.ID
transport datatransfer.Transport
storedCounter *storedcounter.StoredCounter
cidLists cidlists.CIDLists
channelMonitor *channelmonitor.Monitor
channelMonitorCfg *channelmonitor.Config
transferIDGen *timeCounter
}

type internalEvent struct {
Expand Down Expand Up @@ -88,7 +86,7 @@ func ChannelRestartConfig(cfg channelmonitor.Config) DataTransferOption {
}

// NewDataTransfer initializes a new instance of a data transfer manager
func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter, options ...DataTransferOption) (datatransfer.Manager, error) {
func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, options ...DataTransferOption) (datatransfer.Manager, error) {
m := &manager{
dataTransferNetwork: dataTransferNetwork,
validatedTypes: registry.NewRegistry(),
Expand All @@ -99,7 +97,7 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw
readySub: pubsub.New(readyDispatcher),
peerID: dataTransferNetwork.ID(),
transport: transport,
storedCounter: storedCounter,
transferIDGen: newTimeCounter(),
}

cidLists, err := cidlists.NewCIDLists(cidListsDir)
Expand Down
9 changes: 2 additions & 7 deletions impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels"
. "github.com/filecoin-project/go-data-transfer/impl"
Expand Down Expand Up @@ -330,8 +328,7 @@ func TestDataTransferInitiating(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter, verify.options...)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, verify.options...)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -575,11 +572,10 @@ func TestDataTransferRestartInitiating(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
h.voucherValidator = testutil.NewStubbedValidator()

// setup data transfer``
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -620,7 +616,6 @@ type harness struct {
network *testutil.FakeNetwork
transport *testutil.FakeTransport
ds datastore.Batching
storedCounter *storedcounter.StoredCounter
dt datatransfer.Manager
voucherValidator *testutil.StubbedValidator
stor ipld.Node
Expand Down
48 changes: 22 additions & 26 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channelmonitor"
"github.com/filecoin-project/go-data-transfer/encoding"
Expand Down Expand Up @@ -106,10 +104,10 @@ func TestRoundTrip(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -261,10 +259,10 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -383,7 +381,7 @@ func TestManyReceiversAtOnce(t *testing.T) {
host1 := gsData.Host1 // initiator, data sender

tp1 := gsData.SetupGSTransportHost1()
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)

Expand Down Expand Up @@ -411,9 +409,7 @@ func TestManyReceiversAtOnce(t *testing.T) {

dtDs := namespace.Wrap(ds, datastore.NewKey("datatransfer"))

storedCounter := storedcounter.New(ds, datastore.NewKey("counter"))

receiver, err := NewDataTransfer(dtDs, os.TempDir(), dtnet, gsTransport, storedCounter)
receiver, err := NewDataTransfer(dtDs, os.TempDir(), dtnet, gsTransport)
require.NoError(t, err)
err = receiver.Start(gsData.Ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -671,12 +667,12 @@ func TestAutoRestart(t *testing.T) {
MaxConsecutiveRestarts: 5,
CompleteTimeout: 100 * time.Millisecond,
})
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, gsData.StoredCounter1, restartConf)
initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, initiator)
defer initiator.Stop(ctx)

responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt, gsData.StoredCounter2)
responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, responder)
defer responder.Stop(ctx)
Expand Down Expand Up @@ -821,10 +817,10 @@ func TestRoundTripCancelledRequest(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -962,10 +958,10 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
var chid datatransfer.ChannelID
Expand Down Expand Up @@ -1076,10 +1072,10 @@ func TestPauseAndResume(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
finished := make(chan struct{}, 2)
Expand Down Expand Up @@ -1215,10 +1211,10 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)

Expand Down Expand Up @@ -1285,14 +1281,14 @@ func TestDataTransferSubscribing(t *testing.T) {
sv := testutil.NewStubbedValidator()
sv.StubErrorPull()
sv.StubErrorPush()
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt2)
require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv))
voucher := testutil.FakeDTType{Data: "applesauce"}
baseCid := testutil.GenerateCids(1)[0]

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
subscribe1Calls := make(chan struct{}, 1)
Expand Down Expand Up @@ -1424,7 +1420,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
gsData.GsNet2.SetDelegate(gsr)

tp1 := gsData.SetupGSTransportHost1()
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
voucherResult := testutil.NewFakeDTType()
Expand Down Expand Up @@ -1512,7 +1508,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {

gs1 := gsData.SetupGraphsyncHost1()
tp1 := tp.NewTransport(host1.ID(), gs1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
t.Run("when it's not our extension, does not error and does not validate", func(t *testing.T) {
Expand Down Expand Up @@ -1555,7 +1551,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
sv := testutil.NewStubbedValidator()
sv.ExpectSuccessPull()

dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
Expand Down Expand Up @@ -1587,7 +1583,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) {
sv := testutil.NewStubbedValidator()
sv.ExpectErrorPull()
dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt1)
require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv))
Expand Down
9 changes: 2 additions & 7 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels"
. "github.com/filecoin-project/go-data-transfer/impl"
Expand Down Expand Up @@ -566,8 +564,7 @@ func TestDataTransferResponding(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -989,8 +986,7 @@ func TestDataTransferRestartResponding(t *testing.T) {
h.network = testutil.NewFakeNetwork(h.peers[0])
h.transport = testutil.NewFakeTransport()
h.ds = dss.MutexWrap(datastore.NewMapDatastore())
h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter"))
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter)
dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport)
require.NoError(t, err)
testutil.StartAndWaitForReady(ctx, t, dt)
h.dt = dt
Expand Down Expand Up @@ -1036,7 +1032,6 @@ type receiverHarness struct {
sv *testutil.StubbedValidator
srv *testutil.StubbedRevalidator
ds datastore.Batching
storedCounter *storedcounter.StoredCounter
dt datatransfer.Manager
stor ipld.Node
voucher *testutil.FakeDTType
Expand Down
12 changes: 6 additions & 6 deletions impl/restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestRestartPush(t *testing.T) {
require.NoError(t, rh.dt1.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp1 := rh.gsData.SetupGSTransportHost1()
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1)
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt1.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt1)
Expand All @@ -71,7 +71,7 @@ func TestRestartPush(t *testing.T) {
require.NoError(t, rh.dt2.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp2 := rh.gsData.SetupGSTransportHost2()
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2)
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt2.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt2)
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestRestartPull(t *testing.T) {
require.NoError(t, rh.dt2.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp2 := rh.gsData.SetupGSTransportHost2()
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2)
rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt2.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt2)
Expand All @@ -277,7 +277,7 @@ func TestRestartPull(t *testing.T) {
require.NoError(t, rh.dt1.Stop(rh.testCtx))
time.Sleep(100 * time.Millisecond)
tp1 := rh.gsData.SetupGSTransportHost1()
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1)
rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1)
require.NoError(rh.t, err)
require.NoError(rh.t, rh.dt1.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv))
testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt1)
Expand Down Expand Up @@ -475,10 +475,10 @@ func newRestartHarness(t *testing.T) *restartHarness {
tp1 := gsData.SetupGSTransportHost1()
tp2 := gsData.SetupGSTransportHost2()

dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1)
dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1)
require.NoError(t, err)

dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2)
dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2)
require.NoError(t, err)

sv := testutil.NewStubbedValidator()
Expand Down
Loading

0 comments on commit 42e954d

Please sign in to comment.