Skip to content

Commit

Permalink
Don't add blocks to the datastore
Browse files Browse the repository at this point in the history
This leave the responsibility and choice to do so to the caller, typically go-blockservice.

This has several benefit:
- untangle the code
- allow to use an exchange as pure block retrieval
- avoid double add

Close ipfs/kubo#7956
  • Loading branch information
MichaelMure committed Jul 8, 2022
1 parent 8497368 commit 0855450
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 48 deletions.
84 changes: 45 additions & 39 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,60 +469,68 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.String("Block", blk.Cid().String())))
defer span.End()
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

wanted := blks
// NOTE: There exists the possibility for a race condition here. If a user
// creates a node, then adds it to the dagservice while another goroutine
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isn't causing immediate problems.

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(ctx, "", []cid.Cid{blk.Cid()}, nil, nil)

// Send wanted blocks to decision engine
bs.engine.ReceiveFrom("", []blocks.Block{blk})

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
bs.notif.Publish(blk)

// If blocks came from the network
if from != "" {
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
// If the reprovider is enabled, send block to reprovider
if bs.provideEnabled {
select {
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
}
}

// Put wanted blocks into blockstore
if len(wanted) > 0 {
err := bs.blockstore.PutMany(ctx, wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
return nil
}

// receiveBlocksFrom process blocks received from the network
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}

// NOTE: There exists the possiblity for a race condition here. If a user
// creates a node, then adds it to the dagservice while another goroutine
// is waiting on a GetBlock for that object, they will receive a reference
// to the same node. We should address this soon, but i'm not going to do
// it now as it requires more thought and isnt causing immediate problems.
wanted, notWanted := bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}

allKs := make([]cid.Cid, 0, len(blks))
for _, b := range blks {
allKs = append(allKs, b.Cid())
}

// If the message came from the network
if from != "" {
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)
}
// Inform the PeerManager so that we can calculate per-peer latency
combined := make([]cid.Cid, 0, len(allKs)+len(haves)+len(dontHaves))
combined = append(combined, allKs...)
combined = append(combined, haves...)
combined = append(combined, dontHaves...)
bs.pm.ResponseReceived(from, combined)

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
Expand Down Expand Up @@ -550,10 +558,8 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
}
}

if from != "" {
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}
for _, b := range wanted {
log.Debugw("Bitswap.GetBlockRequest.End", "cid", b.Cid())
}

return nil
Expand Down
12 changes: 3 additions & 9 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
hasBlock := peers[0]
defer hasBlock.Exchange.Close()

if err := hasBlock.Blockstore().Put(context.Background(), block); err != nil {
t.Fatal(err)
}
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -232,15 +235,6 @@ func TestPendingBlockAdded(t *testing.T) {
if !blkrecvd.Cid().Equals(lastBlock.Cid()) {
t.Fatal("received wrong block")
}

// Make sure Bitswap adds the block to the blockstore
blockInStore, err := instance.Blockstore().Has(context.Background(), lastBlock.Cid())
if err != nil {
t.Fatal(err)
}
if !blockInStore {
t.Fatal("Block was not added to block store")
}
}

func TestLargeSwarm(t *testing.T) {
Expand Down

0 comments on commit 0855450

Please sign in to comment.