Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockstore: extract ARC cache from Bloom cache #3026

Merged
merged 3 commits into from
Aug 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions blocks/blockstore/arc_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package blockstore

import (
"github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

type arccache struct {
arc *lru.ARCCache
blockstore Blockstore
}

func arcCached(bs Blockstore, lruSize int) (*arccache, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • newArcCacheDS

arc, err := lru.NewARC(lruSize)
if err != nil {
return nil, err
}

return &arccache{arc: arc, blockstore: bs}, nil
}

func (b *arccache) DeleteBlock(k key.Key) error {
if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
switch err {
case nil, ds.ErrNotFound, ErrNotFound:
b.arc.Add(k, false)
return nil
default:
return err
}
}

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *arccache) hasCached(k key.Key) (has bool, ok bool) {
if k == "" {
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return false, false
}

h, ok := b.arc.Get(k)
if ok {
return h.(bool), true
}
return false, false
}

func (b *arccache) Has(k key.Key) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}

res, err := b.blockstore.Has(k)
if err == nil {
b.arc.Add(k, res)
}
return res, err
}

func (b *arccache) Get(k key.Key) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.arc.Add(k, false)
} else if bl != nil {
b.arc.Add(k, true)
}
return bl, err
}

func (b *arccache) Put(bl blocks.Block) error {
if has, ok := b.hasCached(bl.Key()); ok && has {
return nil
}

err := b.blockstore.Put(bl)
if err == nil {
b.arc.Add(bl.Key(), true)
}
return err
}

func (b *arccache) PutMany(bs []blocks.Block) error {
var good []blocks.Block
for _, block := range bs {
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
good = append(good, block)
}
}
err := b.blockstore.PutMany(bs)
if err != nil {
return err
}
for _, block := range bs {
b.arc.Add(block.Key(), true)
}
return nil
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return b.blockstore.AllKeysChan(ctx)
}

func (b *arccache) GCLock() Unlocker {
return b.blockstore.(GCBlockstore).GCLock()
}

func (b *arccache) PinLock() Unlocker {
return b.blockstore.(GCBlockstore).PinLock()
}

func (b *arccache) GCRequested() bool {
return b.blockstore.(GCBlockstore).GCRequested()
}
67 changes: 67 additions & 0 deletions blocks/blockstore/arc_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package blockstore

import (
"github.com/ipfs/go-ipfs/blocks"
"testing"

ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
syncds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore/sync"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) {
if ctx == nil {
ctx = context.TODO()
}
opts := DefaultCacheOpts()
opts.HasBloomFilterSize = 0
opts.HasBloomFilterHashes = 0
bbs, err := CachedBlockstore(bs, ctx, opts)
if err == nil {
return bbs.(*arccache), nil
} else {
return nil, err
}
}

func TestRemoveCacheEntryOnDelete(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testArcCached(bs, nil)
if err != nil {
t.Fatal(err)
}
cachedbs.Put(b)

cd.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats with locking here? you dont need to lock around a variable instantiation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah why is this lock here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race detector was giving me an error otherwise for some reason, I can tinker with it more to find out the exact reason.

Maybe line 47 should be locked instead.

writeHitTheDatastore := false
cd.Unlock()

cd.SetFunc(func() {
writeHitTheDatastore = true
})

cachedbs.DeleteBlock(b.Key())
cachedbs.Put(b)
if !writeHitTheDatastore {
t.Fail()
}
}

func TestElideDuplicateWrite(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testArcCached(bs, nil)
if err != nil {
t.Fatal(err)
}

b1 := blocks.NewBlock([]byte("foo"))

cachedbs.Put(b1)
cd.SetFunc(func() {
t.Fatal("write hit the datastore")
})
cachedbs.Put(b1)
}
46 changes: 6 additions & 40 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package blockstore
import (
"github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
ds "gx/ipfs/QmTxLSvdhwg68WJimdS6icLPhZi28aTp6b7uihC2Yb47Xk/go-datastore"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"

Expand All @@ -13,16 +11,12 @@ import (

// bloomCached returns Blockstore that caches Has requests using Bloom filter
// Size is size of bloom filter in bytes
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) {
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
if err != nil {
return nil, err
}
arc, err := lru.NewARC(lruSize)
if err != nil {
return nil, err
}
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
bc := &bloomcache{blockstore: bs, bloom: bl}
bc.Invalidate()
go bc.Rebuild(ctx)

Expand All @@ -33,7 +27,6 @@ type bloomcache struct {
bloom *bloom.Bloom
active int32

arc *lru.ARCCache
// This chan is only used for testing to wait for bloom to enable
rebuildChan chan struct{}
blockstore Blockstore
Expand Down Expand Up @@ -84,17 +77,7 @@ func (b *bloomcache) DeleteBlock(k key.Key) error {
return ErrNotFound
}

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
switch err {
case nil:
b.arc.Add(k, false)
case ds.ErrNotFound, ErrNotFound:
b.arc.Add(k, false)
default:
return err
}
return nil
return b.blockstore.DeleteBlock(k)
}

// if ok == false has is inconclusive
Expand All @@ -111,38 +94,23 @@ func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
return false, true
}
}
h, ok := b.arc.Get(k)
if ok {
return h.(bool), ok
} else {
return false, false
}
return false, false
}

func (b *bloomcache) Has(k key.Key) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}

res, err := b.blockstore.Has(k)
if err == nil {
b.arc.Add(k, res)
}
return res, err
return b.blockstore.Has(k)
}

func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.arc.Add(k, false)
} else if bl != nil {
b.arc.Add(k, true)
}
return bl, err
return b.blockstore.Get(k)
}

func (b *bloomcache) Put(bl blocks.Block) error {
Expand All @@ -153,7 +121,6 @@ func (b *bloomcache) Put(bl blocks.Block) error {
err := b.blockstore.Put(bl)
if err == nil {
b.bloom.AddTS([]byte(bl.Key()))
b.arc.Add(bl.Key(), true)
}
return err
}
Expand All @@ -169,7 +136,6 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
if err == nil {
for _, block := range bs {
b.bloom.AddTS([]byte(block.Key()))
b.arc.Add(block.Key(), true)
}
}
return err
Expand Down
49 changes: 2 additions & 47 deletions blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error)
ctx = context.TODO()
}
opts := DefaultCacheOpts()
opts.HasARCCacheSize = 0
bbs, err := CachedBlockstore(bs, ctx, opts)
if err == nil {
return bbs.(*bloomcache), nil
Expand All @@ -29,56 +30,10 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error)

func TestReturnsErrorWhenSizeNegative(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
_, err := bloomCached(bs, context.TODO(), 100, 1, -1)
_, err := bloomCached(bs, context.TODO(), -1, 1)
if err == nil {
t.Fail()
}
_, err = bloomCached(bs, context.TODO(), -1, 1, 100)
if err == nil {
t.Fail()
}
}

func TestRemoveCacheEntryOnDelete(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}
cachedbs.Put(b)

cd.Lock()
writeHitTheDatastore := false
cd.Unlock()

cd.SetFunc(func() {
writeHitTheDatastore = true
})

cachedbs.DeleteBlock(b.Key())
cachedbs.Put(b)
if !writeHitTheDatastore {
t.Fail()
}
}

func TestElideDuplicateWrite(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}

b1 := blocks.NewBlock([]byte("foo"))

cachedbs.Put(b1)
cd.SetFunc(func() {
t.Fatal("write hit the datastore")
})
cachedbs.Put(b1)
}
func TestHasIsBloomCached(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
Expand Down
6 changes: 4 additions & 2 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ func CachedBlockstore(bs GCBlockstore,
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
}
if opts.HasBloomFilterSize != 0 {
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes,
opts.HasARCCacheSize)
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes)
}
if opts.HasARCCacheSize > 0 {
cbs, err = arcCached(cbs, opts.HasARCCacheSize)
}

return cbs, err
Expand Down