Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap: redo: don't re-provide blocks we've provided very recently #3253

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions blocks/blockstore/arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ func (b *arccache) Get(k key.Key) (blocks.Block, error) {
return bl, err
}

func (b *arccache) Put(bl blocks.Block) error {
func (b *arccache) Put(bl blocks.Block) (error, blocks.Block) {
if has, ok := b.hasCached(bl.Key()); ok && has {
return nil
return nil, nil
}

err := b.blockstore.Put(bl)
err, added := b.blockstore.Put(bl)
if err == nil {
b.arc.Add(bl.Key(), true)
}
return err
return err, added
}

func (b *arccache) PutMany(bs []blocks.Block) error {
func (b *arccache) PutMany(bs []blocks.Block) (error, []blocks.Block) {
var good []blocks.Block
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
Expand All @@ -112,14 +112,14 @@ func (b *arccache) PutMany(bs []blocks.Block) error {
good = append(good, block)
}
}
err := b.blockstore.PutMany(good)
err, added := b.blockstore.PutMany(good)
if err != nil {
return err
return err, nil
}
for _, block := range good {
b.arc.Add(block.Key(), true)
}
return nil
return nil, added
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestHasRequestTriggersCache(t *testing.T) {
}

untrap(cd)
err := arc.Put(exampleBlock)
err, _ := arc.Put(exampleBlock)
if err != nil {
t.Fatal(err)
}
Expand All @@ -112,7 +112,7 @@ func TestGetFillsCache(t *testing.T) {

untrap(cd)

if err := arc.Put(exampleBlock); err != nil {
if err, _ := arc.Put(exampleBlock); err != nil {
t.Fatal(err)
}

Expand Down
26 changes: 16 additions & 10 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ type Blockstore interface {
DeleteBlock(key.Key) error
Has(key.Key) (bool, error)
Get(key.Key) (blocks.Block, error)
Put(blocks.Block) error
PutMany([]blocks.Block) error

// Put and PutMany return the blocks(s) actually added to the
// blockstore. If a block already exists it will not be returned.

Put(blocks.Block) (error, blocks.Block)
PutMany([]blocks.Block) (error, []blocks.Block)

AllKeysChan(ctx context.Context) (<-chan key.Key, error)
}
Expand Down Expand Up @@ -109,23 +113,24 @@ func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
}
}

func (bs *blockstore) Put(block blocks.Block) error {
func (bs *blockstore) Put(block blocks.Block) (error, blocks.Block) {
k := block.Key().DsKey()

// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
return nil, nil // already stored.
}
return bs.datastore.Put(k, block.RawData())
return bs.datastore.Put(k, block.RawData()), block
}

func (bs *blockstore) PutMany(blocks []blocks.Block) error {
func (bs *blockstore) PutMany(blks []blocks.Block) (error, []blocks.Block) {
t, err := bs.datastore.Batch()
if err != nil {
return err
return err, nil
}
for _, b := range blocks {
added := make([]blocks.Block, 0, len(blks))
for _, b := range blks {
k := b.Key().DsKey()
exists, err := bs.datastore.Has(k)
if err == nil && exists {
Expand All @@ -134,10 +139,11 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {

err = t.Put(k, b.RawData())
if err != nil {
return err
return err, nil
}
added = append(added, b)
}
return t.Commit()
return t.Commit(), added
}

func (bs *blockstore) Has(k key.Key) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPutThenGetBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))

err := bs.Put(block)
err, _ := bs.Put(block)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func newBlockStoreWithKeys(t *testing.T, d ds.Datastore, N int) (Blockstore, []k
keys := make([]key.Key, N)
for i := 0; i < N; i++ {
block := blocks.NewBlock([]byte(fmt.Sprintf("some data %d", i)))
err := bs.Put(block)
err, _ := bs.Put(block)
if err != nil {
t.Fatal(err)
}
Expand Down
16 changes: 8 additions & 8 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,31 +140,31 @@ func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
return b.blockstore.Get(k)
}

func (b *bloomcache) Put(bl blocks.Block) error {
func (b *bloomcache) Put(bl blocks.Block) (error, blocks.Block) {
if has, ok := b.hasCached(bl.Key()); ok && has {
return nil
return nil, nil
}

err := b.blockstore.Put(bl)
err, added := b.blockstore.Put(bl)
if err == nil {
b.bloom.AddTS([]byte(bl.Key()))
}
return err
return err, added
}

func (b *bloomcache) PutMany(bs []blocks.Block) error {
func (b *bloomcache) PutMany(bs []blocks.Block) (error, []blocks.Block) {
// bloom cache gives only conclusive resulty if key is not contained
// to reduce number of puts we need conclusive infomration if block is contained
// this means that PutMany can't be improved with bloom cache so we just
// just do a passthrough.
err := b.blockstore.PutMany(bs)
err, added := b.blockstore.PutMany(bs)
if err != nil {
return err
return err, nil
}
for _, bl := range bs {
b.bloom.AddTS([]byte(bl.Key()))
}
return nil
return nil, added
}

func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
Expand Down
52 changes: 12 additions & 40 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package blockservice

import (
"errors"
"fmt"

blocks "github.com/ipfs/go-ipfs/blocks"
"github.com/ipfs/go-ipfs/blocks/blockstore"
Expand Down Expand Up @@ -51,67 +50,40 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) *BlockService {
// AddBlock adds a particular block to the service, Putting it into the datastore.
// TODO pass a context into this if the remote.HasBlock is going to remain here.
func (s *BlockService) AddObject(o Object) (*cid.Cid, error) {
// TODO: while this is a great optimization, we should think about the
// possibility of streaming writes directly to disk. If we can pass this object
// all the way down to the datastore without having to 'buffer' its data,
// we could implement a `WriteTo` method on it that could do a streaming write
// of the content, saving us (probably) considerable memory.
c := o.Cid()
has, err := s.Blockstore.Has(key.Key(c.Hash()))
err, added := s.Blockstore.Put(o)
if err != nil {
return nil, err
}

if has {
return c, nil
}

err = s.Blockstore.Put(o)
if err != nil {
return nil, err
if added == nil {
return o.Cid(), nil
}

if err := s.Exchange.HasBlock(o); err != nil {
return nil, errors.New("blockservice is closed")
}

return c, nil
return o.Cid(), nil
}

func (s *BlockService) AddObjects(bs []Object) ([]*cid.Cid, error) {
var toput []blocks.Block
var toputcids []*cid.Cid
cids := make([]*cid.Cid, 0, len(bs))
blks := make([]blocks.Block, 0, len(bs))
for _, b := range bs {
c := b.Cid()

has, err := s.Blockstore.Has(key.Key(c.Hash()))
if err != nil {
return nil, err
}

if has {
continue
}

toput = append(toput, b)
toputcids = append(toputcids, c)
cids = append(cids, b.Cid())
blks = append(blks, b)
}

err := s.Blockstore.PutMany(toput)
err, added := s.Blockstore.PutMany(blks)
if err != nil {
return nil, err
}

var ks []*cid.Cid
for _, o := range toput {
for _, o := range added {
if err := s.Exchange.HasBlock(o); err != nil {
return nil, fmt.Errorf("blockservice is closed (%s)", err)
return nil, errors.New("blockservice is closed")
}

c := o.(Object).Cid() // cast is safe, we created these
ks = append(ks, c)
}
return ks, nil
return cids, nil
}

// GetBlock retrieves a particular block from the service,
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error {
default:
}

err := bs.blockstore.Put(blk)
err,_ := bs.blockstore.Put(blk)
if err != nil {
log.Errorf("Error writing block to datastore: %s", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion exchange/bitswap/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
for _, letter := range alphabet {
block := blocks.NewBlock([]byte(letter))
if err := bs.Put(block); err != nil {
if err, _ := bs.Put(block); err != nil {
t.Fatal(err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion exchange/offline/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ func (e *offlineExchange) GetBlock(_ context.Context, k key.Key) (blocks.Block,

// HasBlock always returns nil.
func (e *offlineExchange) HasBlock(b blocks.Block) error {
return e.bs.Put(b)
err, _ := e.bs.Put(b)
return err
}

// Close always returns nil.
Expand Down
4 changes: 2 additions & 2 deletions test/integration/bitswap_wo_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
block1 := blocks.NewBlock([]byte("block1"))

// put 1 before
if err := nodes[0].Blockstore.Put(block0); err != nil {
if err, _ := nodes[0].Blockstore.Put(block0); err != nil {
t.Fatal(err)
}

Expand All @@ -81,7 +81,7 @@ func TestBitswapWithoutRouting(t *testing.T) {
}

// put 1 after
if err := nodes[1].Blockstore.Put(block1); err != nil {
if err, _ := nodes[1].Blockstore.Put(block1); err != nil {
t.Fatal(err)
}

Expand Down