Skip to content

Commit

Permalink
Don't add blocks to the datastore
Browse files Browse the repository at this point in the history
This leave the responsibility and choice to do so to the caller, typically go-blockservice.

This has several benefit:
- untangle the code
- allow to use an exchange as pure block retrieval
- avoid double add

Close ipfs/kubo#7956
  • Loading branch information
MichaelMure committed Jul 9, 2022
1 parent 8497368 commit d25b948
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 124 deletions.
84 changes: 45 additions & 39 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,60 +469,68 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.String("Block", blk.Cid().String())))
defer span.End()
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

wanted := blks
// NOTE: There exists the possibility for a race condition here. If a user
// creates a node, then adds it to the dagservice while another goroutine
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isn't causing immediate problems.

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, "", []cid.Cid{blk.Cid()}, nil, nil)

// Send wanted blocks to decision engine
bs.engine.ReceiveFrom("", []blocks.Block{blk})

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blk)

// If blocks came from the network
if from != "" {
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}

// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(ctx, wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
return nil
}

// receiveBlocksFrom process blocks received from the network
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

// NOTE: There exists the possiblity for a race condition here. If a user
// creates a node, then adds it to the dagservice while another goroutine
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
wanted, notWanted := bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}

allKs := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
allKs = append(allKs, b.Cid())
}

// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)
}
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
Expand Down Expand Up @@ -550,10 +558,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
}
}

if from != "" {
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}

return nil
Expand Down
94 changes: 27 additions & 67 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ func getVirtualNetwork() tn.Network {
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
}

func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
t.Helper()
err := inst.Blockstore().Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
err = inst.Exchange.HasBlock(ctx, blk)
if err != nil {
t.Fatal(err)
}
}

func TestClose(t *testing.T) {
vnet := getVirtualNetwork()
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
Expand Down Expand Up @@ -95,9 +107,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

wantsBlock := peers[1]
defer wantsBlock.Exchange.Close()
Expand Down Expand Up @@ -128,9 +138,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
wantsBlock := ig.Next()
defer wantsBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Millisecond)
defer cancel()
Expand Down Expand Up @@ -163,9 +171,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), hasBlock, block)

doesNotWantBlock := peers[1]
defer doesNotWantBlock.Exchange.Close()
Expand Down Expand Up @@ -232,15 +238,6 @@ func TestPendingBlockAdded(t *testing.T) {
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
t.Fatal("received wrong block")
}

// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(context.Background(), lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !blockInStore {
t.Fatal("Block was not added to block store")
}
}

func TestLargeSwarm(t *testing.T) {
Expand Down Expand Up @@ -307,10 +304,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
first := instances[0]
for _, b := range blocks {
blkeys = append(blkeys, b.Cid())
err := first.Exchange.HasBlock(ctx, b)
if err != nil {
t.Fatal(err)
}
addBlock(t, ctx, first, b)
}

t.Log("Distribute!")
Expand Down Expand Up @@ -341,16 +335,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Fatal(err)
}
}

t.Log("Verify!")

for _, inst := range instances {
for _, b := range blocks {
if _, err := inst.Blockstore().Get(ctx, b.Cid()); err != nil {
t.Fatal(err)
}
}
}
}

// TODO simplify this test. get to the _essence_!
Expand Down Expand Up @@ -383,10 +367,7 @@ func TestSendToWantingPeer(t *testing.T) {
}

// peerB announces to the network that he has block alpha
err = peerB.Exchange.HasBlock(ctx, alpha)
if err != nil {
t.Fatal(err)
}
addBlock(t, ctx, peerB, alpha)

// At some point, peerA should get alpha (or timeout)
blkrecvd, ok := <-alphaPromise
Expand Down Expand Up @@ -445,10 +426,7 @@ func TestBasicBitswap(t *testing.T) {
blocks := bg.Blocks(1)

// First peer has block
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -545,10 +523,7 @@ func TestDoubleGet(t *testing.T) {
t.Fatal("expected channel to be closed")
}

err = instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

select {
case blk, ok := <-blkch2:
Expand Down Expand Up @@ -708,10 +683,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(1)
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
Expand Down Expand Up @@ -760,19 +732,12 @@ func TestBitswapLedgerTwoWay(t *testing.T) {

instances := ig.Instances(2)
blocks := bg.Blocks(2)
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}

err = instances[1].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])
addBlock(t, context.Background(), instances[1], blocks[1])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
_, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -911,17 +876,14 @@ func TestTracer(t *testing.T) {
bitswap.WithTracer(wiretap)(instances[0].Exchange)

// First peer has block
err := instances[0].Exchange.HasBlock(context.Background(), blocks[0])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[0])

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Second peer broadcasts want for block CID
// (Received by first and third peers)
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
_, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -995,10 +957,8 @@ func TestTracer(t *testing.T) {
// After disabling WireTap, no new messages are logged
bitswap.WithTracer(nil)(instances[0].Exchange)

err = instances[0].Exchange.HasBlock(context.Background(), blocks[1])
if err != nil {
t.Fatal(err)
}
addBlock(t, context.Background(), instances[0], blocks[1])

_, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid())
if err != nil {
t.Fatal(err)
Expand Down
24 changes: 6 additions & 18 deletions bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ func TestFetchNotConnected(t *testing.T) {
// Provide 10 blocks on Peer A
blks := bgen.Blocks(10)
for _, block := range blks {
if err := other.Exchange.HasBlock(ctx, block); err != nil {
t.Fatal(err)
}
addBlock(t, ctx, other, block)
}

var cids []cid.Cid
Expand Down Expand Up @@ -243,9 +241,7 @@ func TestFetchAfterDisconnect(t *testing.T) {

firstBlks := blks[:5]
for _, block := range firstBlks {
if err := peerA.Exchange.HasBlock(ctx, block); err != nil {
t.Fatal(err)
}
addBlock(t, ctx, peerA, block)
}

// Request all blocks with Peer B
Expand Down Expand Up @@ -279,9 +275,7 @@ func TestFetchAfterDisconnect(t *testing.T) {
// Provide remaining blocks
lastBlks := blks[5:]
for _, block := range lastBlks {
if err := peerA.Exchange.HasBlock(ctx, block); err != nil {
t.Fatal(err)
}
addBlock(t, ctx, peerA, block)
}

// Peer B should call FindProviders() and find Peer A
Expand Down Expand Up @@ -334,9 +328,7 @@ func TestInterestCacheOverflow(t *testing.T) {
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)

if err := b.Exchange.HasBlock(ctx, blks[0]); err != nil {
t.Fatal(err)
}
addBlock(t, ctx, b, blks[0])

select {
case blk, ok := <-zeroch:
Expand Down Expand Up @@ -381,9 +373,7 @@ func TestPutAfterSessionCacheEvict(t *testing.T) {
// wait to ensure that all the above cids were added to the sessions cache
time.Sleep(time.Millisecond * 50)

if err := a.Exchange.HasBlock(ctx, blks[17]); err != nil {
t.Fatal(err)
}
addBlock(t, ctx, a, blks[17])

select {
case <-blkch:
Expand Down Expand Up @@ -423,9 +413,7 @@ func TestMultipleSessions(t *testing.T) {
}

time.Sleep(time.Millisecond * 10)
if err := b.Exchange.HasBlock(ctx, blk); err != nil {
t.Fatal(err)
}
addBlock(t, ctx, b, blk)

select {
case <-blkch2:
Expand Down

0 comments on commit d25b948

Please sign in to comment.