diff --git a/benchmarks/testinstance/testinstance.go b/benchmarks/testinstance/testinstance.go index 14b761b4..00843a8d 100644 --- a/benchmarks/testinstance/testinstance.go +++ b/benchmarks/testinstance/testinstance.go @@ -19,8 +19,6 @@ import ( "github.com/ipld/go-ipld-prime" peer "github.com/libp2p/go-libp2p-core/peer" - "github.com/filecoin-project/go-storedcounter" - datatransfer "github.com/filecoin-project/go-data-transfer" tn "github.com/filecoin-project/go-data-transfer/benchmarks/testnet" dtimpl "github.com/filecoin-project/go-data-transfer/impl" @@ -170,8 +168,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD storer := storeutil.StorerForBlockstore(bstore) gs := gsimpl.New(ctx, gsNet, loader, storer, gsimpl.RejectAllRequestsByDefault()) transport := gstransport.NewTransport(p, gs) - dtCounter := storedcounter.New(dstore, datastore.NewKey("/data-transfers/counter")) - dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport, dtCounter) + dt, err := dtimpl.NewDataTransfer(namespace.Wrap(dstore, datastore.NewKey("/data-transfers/transfers")), os.TempDir(), dtNet, transport) if err != nil { return Instance{}, err } diff --git a/go.mod b/go.mod index f05bdc2b..b3a7053b 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.14 require ( github.com/filecoin-project/go-ds-versioning v0.1.0 github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe - github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e github.com/ipfs/go-block-format v0.0.2 diff --git a/go.sum b/go.sum index 62f7a5bf..4bff2c08 100644 --- a/go.sum +++ b/go.sum @@ -95,8 +95,6 @@ github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe h github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= -github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= -github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 h1:u/UEqS66A5ckRmS4yNpjmVH56sVtS/RfclBAYocb4as= github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= diff --git a/impl/impl.go b/impl/impl.go index c1b6788f..8490f11e 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -14,8 +14,6 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" - "github.com/filecoin-project/go-storedcounter" - datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/channelmonitor" "github.com/filecoin-project/go-data-transfer/channels" @@ -39,10 +37,10 @@ type manager struct { channels *channels.Channels peerID peer.ID transport datatransfer.Transport - storedCounter *storedcounter.StoredCounter cidLists cidlists.CIDLists channelMonitor *channelmonitor.Monitor channelMonitorCfg *channelmonitor.Config + transferIDGen *timeCounter } type internalEvent struct { @@ -88,7 +86,7 @@ func ChannelRestartConfig(cfg channelmonitor.Config) DataTransferOption { } // NewDataTransfer initializes a new instance of a data transfer manager -func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, storedCounter *storedcounter.StoredCounter, options ...DataTransferOption) (datatransfer.Manager, error) { +func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetwork network.DataTransferNetwork, transport datatransfer.Transport, options ...DataTransferOption) (datatransfer.Manager, error) { m := &manager{ dataTransferNetwork: dataTransferNetwork, validatedTypes: registry.NewRegistry(), @@ -99,7 +97,7 @@ func NewDataTransfer(ds datastore.Batching, cidListsDir string, dataTransferNetw readySub: pubsub.New(readyDispatcher), peerID: dataTransferNetwork.ID(), transport: transport, - storedCounter: storedCounter, + transferIDGen: newTimeCounter(), } cidLists, err := cidlists.NewCIDLists(cidListsDir) diff --git a/impl/initiating_test.go b/impl/initiating_test.go index 4b13a009..3259817c 100644 --- a/impl/initiating_test.go +++ b/impl/initiating_test.go @@ -16,8 +16,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" - "github.com/filecoin-project/go-storedcounter" - datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/channels" . "github.com/filecoin-project/go-data-transfer/impl" @@ -330,8 +328,7 @@ func TestDataTransferInitiating(t *testing.T) { h.network = testutil.NewFakeNetwork(h.peers[0]) h.transport = testutil.NewFakeTransport() h.ds = dss.MutexWrap(datastore.NewMapDatastore()) - h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter")) - dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter, verify.options...) + dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, verify.options...) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt) h.dt = dt @@ -575,11 +572,10 @@ func TestDataTransferRestartInitiating(t *testing.T) { h.network = testutil.NewFakeNetwork(h.peers[0]) h.transport = testutil.NewFakeTransport() h.ds = dss.MutexWrap(datastore.NewMapDatastore()) - h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter")) h.voucherValidator = testutil.NewStubbedValidator() // setup data transfer`` - dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter) + dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt) h.dt = dt @@ -620,7 +616,6 @@ type harness struct { network *testutil.FakeNetwork transport *testutil.FakeTransport ds datastore.Batching - storedCounter *storedcounter.StoredCounter dt datatransfer.Manager voucherValidator *testutil.StubbedValidator stor ipld.Node diff --git a/impl/integration_test.go b/impl/integration_test.go index 2e744df2..4abaad22 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -29,8 +29,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/filecoin-project/go-storedcounter" - datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/channelmonitor" "github.com/filecoin-project/go-data-transfer/encoding" @@ -106,10 +104,10 @@ func TestRoundTrip(t *testing.T) { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) @@ -261,10 +259,10 @@ func TestMultipleRoundTripMultipleStores(t *testing.T) { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) @@ -383,7 +381,7 @@ func TestManyReceiversAtOnce(t *testing.T) { host1 := gsData.Host1 // initiator, data sender tp1 := gsData.SetupGSTransportHost1() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) @@ -411,9 +409,7 @@ func TestManyReceiversAtOnce(t *testing.T) { dtDs := namespace.Wrap(ds, datastore.NewKey("datatransfer")) - storedCounter := storedcounter.New(ds, datastore.NewKey("counter")) - - receiver, err := NewDataTransfer(dtDs, os.TempDir(), dtnet, gsTransport, storedCounter) + receiver, err := NewDataTransfer(dtDs, os.TempDir(), dtnet, gsTransport) require.NoError(t, err) err = receiver.Start(gsData.Ctx) require.NoError(t, err) @@ -671,12 +667,12 @@ func TestAutoRestart(t *testing.T) { MaxConsecutiveRestarts: 5, CompleteTimeout: 100 * time.Millisecond, }) - initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, gsData.StoredCounter1, restartConf) + initiator, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, initiatorGSTspt, restartConf) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, initiator) defer initiator.Stop(ctx) - responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt, gsData.StoredCounter2) + responder, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, responderGSTspt) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, responder) defer responder.Stop(ctx) @@ -821,10 +817,10 @@ func TestRoundTripCancelledRequest(t *testing.T) { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) @@ -962,10 +958,10 @@ func TestSimulatedRetrievalFlow(t *testing.T) { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) var chid datatransfer.ChannelID @@ -1076,10 +1072,10 @@ func TestPauseAndResume(t *testing.T) { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) finished := make(chan struct{}, 2) @@ -1215,10 +1211,10 @@ func TestUnrecognizedVoucherRoundTrip(t *testing.T) { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) @@ -1285,14 +1281,14 @@ func TestDataTransferSubscribing(t *testing.T) { sv := testutil.NewStubbedValidator() sv.StubErrorPull() sv.StubErrorPush() - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt2) require.NoError(t, dt2.RegisterVoucherType(&testutil.FakeDTType{}, sv)) voucher := testutil.FakeDTType{Data: "applesauce"} baseCid := testutil.GenerateCids(1)[0] - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) subscribe1Calls := make(chan struct{}, 1) @@ -1424,7 +1420,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) { gsData.GsNet2.SetDelegate(gsr) tp1 := gsData.SetupGSTransportHost1() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) voucherResult := testutil.NewFakeDTType() @@ -1512,7 +1508,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) { gs1 := gsData.SetupGraphsyncHost1() tp1 := tp.NewTransport(host1.ID(), gs1) - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) t.Run("when it's not our extension, does not error and does not validate", func(t *testing.T) { @@ -1555,7 +1551,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { sv := testutil.NewStubbedValidator() sv.ExpectSuccessPull() - dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv)) @@ -1587,7 +1583,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) { test: func(t *testing.T, gsData *testutil.GraphsyncTestingData, tp2 datatransfer.Transport, link ipld.Link, id datatransfer.TransferID, gsr *fakeGraphSyncReceiver) { sv := testutil.NewStubbedValidator() sv.ExpectErrorPull() - dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt1, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt1) require.NoError(t, dt1.RegisterVoucherType(&testutil.FakeDTType{}, sv)) diff --git a/impl/responding_test.go b/impl/responding_test.go index bde23473..fbcdc48b 100644 --- a/impl/responding_test.go +++ b/impl/responding_test.go @@ -18,8 +18,6 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/xerrors" - "github.com/filecoin-project/go-storedcounter" - datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/channels" . "github.com/filecoin-project/go-data-transfer/impl" @@ -566,8 +564,7 @@ func TestDataTransferResponding(t *testing.T) { h.network = testutil.NewFakeNetwork(h.peers[0]) h.transport = testutil.NewFakeTransport() h.ds = dss.MutexWrap(datastore.NewMapDatastore()) - h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter")) - dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter) + dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt) h.dt = dt @@ -989,8 +986,7 @@ func TestDataTransferRestartResponding(t *testing.T) { h.network = testutil.NewFakeNetwork(h.peers[0]) h.transport = testutil.NewFakeTransport() h.ds = dss.MutexWrap(datastore.NewMapDatastore()) - h.storedCounter = storedcounter.New(h.ds, datastore.NewKey("counter")) - dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport, h.storedCounter) + dt, err := NewDataTransfer(h.ds, os.TempDir(), h.network, h.transport) require.NoError(t, err) testutil.StartAndWaitForReady(ctx, t, dt) h.dt = dt @@ -1036,7 +1032,6 @@ type receiverHarness struct { sv *testutil.StubbedValidator srv *testutil.StubbedRevalidator ds datastore.Batching - storedCounter *storedcounter.StoredCounter dt datatransfer.Manager stor ipld.Node voucher *testutil.FakeDTType diff --git a/impl/restart_integration_test.go b/impl/restart_integration_test.go index d47f7b47..4c2c0d78 100644 --- a/impl/restart_integration_test.go +++ b/impl/restart_integration_test.go @@ -50,7 +50,7 @@ func TestRestartPush(t *testing.T) { require.NoError(t, rh.dt1.Stop(rh.testCtx)) time.Sleep(100 * time.Millisecond) tp1 := rh.gsData.SetupGSTransportHost1() - rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1) + rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1) require.NoError(rh.t, err) require.NoError(rh.t, rh.dt1.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv)) testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt1) @@ -71,7 +71,7 @@ func TestRestartPush(t *testing.T) { require.NoError(t, rh.dt2.Stop(rh.testCtx)) time.Sleep(100 * time.Millisecond) tp2 := rh.gsData.SetupGSTransportHost2() - rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2) + rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2) require.NoError(rh.t, err) require.NoError(rh.t, rh.dt2.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv)) testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt2) @@ -256,7 +256,7 @@ func TestRestartPull(t *testing.T) { require.NoError(t, rh.dt2.Stop(rh.testCtx)) time.Sleep(100 * time.Millisecond) tp2 := rh.gsData.SetupGSTransportHost2() - rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2, rh.gsData.StoredCounter2) + rh.dt2, err = NewDataTransfer(rh.gsData.DtDs2, rh.gsData.TempDir2, rh.gsData.DtNet2, tp2) require.NoError(rh.t, err) require.NoError(rh.t, rh.dt2.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv)) testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt2) @@ -277,7 +277,7 @@ func TestRestartPull(t *testing.T) { require.NoError(t, rh.dt1.Stop(rh.testCtx)) time.Sleep(100 * time.Millisecond) tp1 := rh.gsData.SetupGSTransportHost1() - rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1, rh.gsData.StoredCounter1) + rh.dt1, err = NewDataTransfer(rh.gsData.DtDs1, rh.gsData.TempDir1, rh.gsData.DtNet1, tp1) require.NoError(rh.t, err) require.NoError(rh.t, rh.dt1.RegisterVoucherType(&testutil.FakeDTType{}, rh.sv)) testutil.StartAndWaitForReady(rh.testCtx, t, rh.dt1) @@ -475,10 +475,10 @@ func newRestartHarness(t *testing.T) *restartHarness { tp1 := gsData.SetupGSTransportHost1() tp2 := gsData.SetupGSTransportHost2() - dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1, gsData.StoredCounter1) + dt1, err := NewDataTransfer(gsData.DtDs1, gsData.TempDir1, gsData.DtNet1, tp1) require.NoError(t, err) - dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2, gsData.StoredCounter2) + dt2, err := NewDataTransfer(gsData.DtDs2, gsData.TempDir2, gsData.DtNet2, tp2) require.NoError(t, err) sv := testutil.NewStubbedValidator() diff --git a/impl/timecounter.go b/impl/timecounter.go new file mode 100644 index 00000000..c13904f0 --- /dev/null +++ b/impl/timecounter.go @@ -0,0 +1,21 @@ +package impl + +import ( + "sync/atomic" + "time" +) + +// timeCounter is used to generate a monotonically increasing sequence. +// It starts at the current time, then increments on each call to next. +type timeCounter struct { + counter uint64 +} + +func newTimeCounter() *timeCounter { + return &timeCounter{counter: uint64(time.Now().UnixNano())} +} + +func (tc *timeCounter) next() uint64 { + counter := atomic.AddUint64(&tc.counter, 1) + return counter +} diff --git a/impl/timecounter_test.go b/impl/timecounter_test.go new file mode 100644 index 00000000..31b4c403 --- /dev/null +++ b/impl/timecounter_test.go @@ -0,0 +1,48 @@ +package impl + +import ( + "sync" + "testing" + "time" +) + +func TestTimeCounter(t *testing.T) { + // Test that counter increases between restarts + tc1 := newTimeCounter() + time.Sleep(time.Millisecond) + tc2 := newTimeCounter() + tc1Next := tc1.next() + tc2Next := tc2.next() + if tc2Next <= tc1Next { + t.Fatal("counter should increase for each new counter generator", tc1Next, tc2Next) + } + + // Test that the counter always increases + for i := 0; i < 100; i++ { + first := tc1.next() + second := tc1.next() + if second <= first { + t.Fatal("counter should increase monotonically", first, second) + } + } + + // Test that the counter is thread-safe + count := 1000 + threads := 20 + counter := tc1.next() + var wg sync.WaitGroup + for i := 0; i < threads; i++ { + wg.Add(1) + go func() { + for i := 0; i < count; i++ { + tc1.next() + } + wg.Done() + }() + } + wg.Wait() + + if tc1.next() != counter+uint64(threads*count+1) { + t.Fatal("next() is not thread safe") + } +} diff --git a/impl/utils.go b/impl/utils.go index bf082f9f..a4ec5d34 100644 --- a/impl/utils.go +++ b/impl/utils.go @@ -32,11 +32,8 @@ var resumeTransportStatesResponder = statusList{ // newRequest encapsulates message creation func (m *manager) newRequest(ctx context.Context, selector ipld.Node, isPull bool, voucher datatransfer.Voucher, baseCid cid.Cid, to peer.ID) (datatransfer.Request, error) { - next, err := m.storedCounter.Next() - if err != nil { - return nil, err - } - tid := datatransfer.TransferID(next) + // Generate a new transfer ID for the request + tid := datatransfer.TransferID(m.transferIDGen.next()) return message.NewRequest(tid, false, isPull, voucher.Type(), voucher, baseCid, selector) } diff --git a/testutil/gstestdata.go b/testutil/gstestdata.go index df14d4ff..62fdc6db 100644 --- a/testutil/gstestdata.go +++ b/testutil/gstestdata.go @@ -38,8 +38,6 @@ import ( mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" - "github.com/filecoin-project/go-storedcounter" - datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/network" gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" @@ -71,8 +69,6 @@ type GraphsyncTestingData struct { host2Protocols []protocol.ID Ctx context.Context Mn mocknet.Mocknet - StoredCounter1 *storedcounter.StoredCounter - StoredCounter2 *storedcounter.StoredCounter DtDs1 datastore.Batching DtDs2 datastore.Batching Bs1 bstore.Blockstore @@ -112,10 +108,6 @@ func NewGraphsyncTestingData(ctx context.Context, t *testing.T, host1Protocols [ gsData.Bs1 = bstore.NewBlockstore(namespace.Wrap(ds1, datastore.NewKey("blockstore"))) gsData.Bs2 = bstore.NewBlockstore(namespace.Wrap(ds2, datastore.NewKey("blockstore"))) - // make stored counters - gsData.StoredCounter1 = storedcounter.New(ds1, datastore.NewKey("counter")) - gsData.StoredCounter2 = storedcounter.New(ds2, datastore.NewKey("counter")) - gsData.DagService1 = merkledag.NewDAGService(blockservice.New(gsData.Bs1, offline.Exchange(gsData.Bs1))) gsData.DagService2 = merkledag.NewDAGService(blockservice.New(gsData.Bs2, offline.Exchange(gsData.Bs2)))