Skip to content

Commit

Permalink
Add basic support for multiple blockstores as outlined in #3119.
Browse files Browse the repository at this point in the history
Each datastore is mounted under a different mount point and a
multi-blockstore is used to check each mount point for the block.

The first mount checked of the multi-blockstore is considered the
"cache", all others are considered read-only.  This implies that the
garbage collector only removes block from the first mount.

This change also factors out the pinlock from the blockstore into its
own structure.  Only the multi-datastore now implements the
GCBlockstore interface.  In the future this could be separated out
from the blockstore completely.

For now caching is only done on the first mount, in the future this
could be reworked.  The bloom filter is the most problematic as the
read-only mounts are not necessary immutable and can be changed by
methods outside of the blockstore.

Right now there is only one mount, but that will soon change once
support for the filestore is added.

License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed Sep 25, 2016
1 parent 300187a commit 83447c1
Show file tree
Hide file tree
Showing 13 changed files with 256 additions and 54 deletions.
2 changes: 1 addition & 1 deletion blocks/blockstore/arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var exampleBlock = blocks.NewBlock([]byte("foo"))

func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) {
func testArcCached(bs Blockstore, ctx context.Context) (*arccache, error) {
if ctx == nil {
ctx = context.TODO()
}
Expand Down
55 changes: 32 additions & 23 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
var log = logging.Logger("blockstore")

// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks")
const DefaultPrefix = "/blocks"

var blockPrefix = ds.NewKey(DefaultPrefix)

var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
Expand All @@ -38,9 +40,7 @@ type Blockstore interface {
AllKeysChan(ctx context.Context) (<-chan key.Key, error)
}

type GCBlockstore interface {
Blockstore

type GCLocker interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
// Reading during GC is safe, and requires no lock.
Expand All @@ -57,21 +57,32 @@ type GCBlockstore interface {
GCRequested() bool
}

type GCBlockstore interface {
Blockstore
GCLocker
}

func NewBlockstore(d ds.Batching) *blockstore {
return NewBlockstoreWPrefix(d, "")
}

func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore {
if prefix == "" {
prefix = DefaultPrefix
}
var dsb ds.Batching
dd := dsns.Wrap(d, BlockPrefix)
prefixKey := ds.NewKey(prefix)
dd := dsns.Wrap(d, prefixKey)
dsb = dd
return &blockstore{
datastore: dsb,
prefix: prefixKey,
}
}

type blockstore struct {
datastore ds.Batching

lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
prefix ds.Key

rehash bool
}
Expand Down Expand Up @@ -112,11 +123,8 @@ func (bs *blockstore) Get(k key.Key) (blocks.Block, error) {
func (bs *blockstore) Put(block blocks.Block) error {
k := block.Key().DsKey()

// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
}
// Note: The Has Check is now done by the MultiBlockstore

return bs.datastore.Put(k, block.RawData())
}

Expand All @@ -127,11 +135,6 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
}
for _, b := range blocks {
k := b.Key().DsKey()
exists, err := bs.datastore.Has(k)
if err == nil && exists {
continue
}

err = t.Put(k, b.RawData())
if err != nil {
return err
Expand All @@ -157,7 +160,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
// datastore/namespace does *NOT* fix up Query.Prefix
q.Prefix = BlockPrefix.String()
q.Prefix = bs.prefix.String()
res, err := bs.datastore.Query(q)
if err != nil {
return nil, err
Expand Down Expand Up @@ -223,6 +226,12 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return output, nil
}

type gclocker struct {
lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
}

type Unlocker interface {
Unlock()
}
Expand All @@ -236,18 +245,18 @@ func (u *unlocker) Unlock() {
u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
func (bs *gclocker) GCLock() Unlocker {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return &unlocker{bs.lk.Unlock}
}

func (bs *blockstore) PinLock() Unlocker {
func (bs *gclocker) PinLock() Unlocker {
bs.lk.RLock()
return &unlocker{bs.lk.RUnlock}
}

func (bs *blockstore) GCRequested() bool {
func (bs *gclocker) GCRequested() bool {
return atomic.LoadInt32(&bs.gcreq) > 0
}
4 changes: 2 additions & 2 deletions blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func TestAllKeysRespectsContext(t *testing.T) {
default:
}

e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
Expand All @@ -188,7 +188,7 @@ func TestValueTypeMismatch(t *testing.T) {
block := blocks.NewBlock([]byte("some data"))

datastore := ds.NewMapDatastore()
k := BlockPrefix.Child(block.Key().DsKey())
k := blockPrefix.Child(block.Key().DsKey())
datastore.Put(k, "data that isn't a block!")

blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
Expand Down
8 changes: 4 additions & 4 deletions blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)

func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
func testBloomCached(bs Blockstore, ctx context.Context) (*bloomcache, error) {
if ctx == nil {
ctx = context.TODO()
}
Expand Down Expand Up @@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) {
block := blocks.NewBlock([]byte("newBlock"))

cachedbs.PutMany([]blocks.Block{block})
if cacheFails != 2 {
t.Fatalf("expected two datastore hits: %d", cacheFails)
if cacheFails != 1 {
t.Fatalf("expected datastore hits: %d", cacheFails)
}
cachedbs.Put(block)
if cacheFails != 3 {
if cacheFails != 2 {
t.Fatalf("expected datastore hit: %d", cacheFails)
}

Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func DefaultCacheOpts() CacheOpts {
}
}

func CachedBlockstore(bs GCBlockstore,
ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) {
func CachedBlockstore(bs Blockstore,
ctx context.Context, opts CacheOpts) (cbs Blockstore, err error) {
cbs = bs

if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
Expand Down
120 changes: 120 additions & 0 deletions blocks/blockstore/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package blockstore

// A very simple multi-blockstore that analogous to a unionfs Put and
// DeleteBlock only go to the first blockstore all others are
// considered readonly.

import (
//"errors"

blocks "github.com/ipfs/go-ipfs/blocks"
key "gx/ipfs/Qmce4Y4zg3sYr7xKM5UueS67vhNni6EeWgCRnb7MbLJMew/go-key"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

type MultiBlockstore interface {
Blockstore
GCLocker
FirstMount() Blockstore
Mounts() []string
Mount(prefix string) Blockstore
}

type Mount struct {
Prefix string
Blocks Blockstore
}

func NewMultiBlockstore(mounts ...Mount) *multiblockstore {
return &multiblockstore{
mounts: mounts,
}
}

type multiblockstore struct {
mounts []Mount
gclocker
}

func (bs *multiblockstore) FirstMount() Blockstore {
return bs.mounts[0].Blocks
}

func (bs *multiblockstore) Mounts() []string {
mounts := make([]string, 0, len(bs.mounts))
for _, mnt := range bs.mounts {
mounts = append(mounts, mnt.Prefix)
}
return mounts
}

func (bs *multiblockstore) Mount(prefix string) Blockstore {
for _, m := range bs.mounts {
if m.Prefix == prefix {
return m.Blocks
}
}
return nil
}

func (bs *multiblockstore) DeleteBlock(key key.Key) error {
return bs.mounts[0].Blocks.DeleteBlock(key)
}

func (bs *multiblockstore) Has(key key.Key) (bool, error) {
var firstErr error
for _, m := range bs.mounts {
have, err := m.Blocks.Has(key)
if have && err == nil {
return have, nil
}
if err != nil && firstErr == nil {
firstErr = err
}
}
return false, firstErr
}

func (bs *multiblockstore) Get(key key.Key) (blocks.Block, error) {
var firstErr error
for _, m := range bs.mounts {
blk, err := m.Blocks.Get(key)
if err == nil {
return blk, nil
}
if firstErr == nil || firstErr == ErrNotFound {
firstErr = err
}
}
return nil, firstErr
}

func (bs *multiblockstore) Put(blk blocks.Block) error {
// Has is cheaper than Put, so see if we already have it
exists, err := bs.Has(blk.Key())
if err == nil && exists {
return nil // already stored
}
return bs.mounts[0].Blocks.Put(blk)
}

func (bs *multiblockstore) PutMany(blks []blocks.Block) error {
stilladd := make([]blocks.Block, 0, len(blks))
// Has is cheaper than Put, so if we already have it then skip
for _, blk := range blks {
exists, err := bs.Has(blk.Key())
if err == nil && exists {
continue // already stored
}
stilladd = append(stilladd, blk)
}
if len(stilladd) == 0 {
return nil
}
return bs.mounts[0].Blocks.PutMany(stilladd)
}

func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return bs.mounts[0].Blocks.AllKeysChan(ctx)
//return nil, errors.New("Unimplemented")
}
14 changes: 12 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"

retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore"
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface"
Expand Down Expand Up @@ -156,7 +157,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}

var err error
bs := bstore.NewBlockstore(rds)
bs := bstore.NewBlockstoreWPrefix(rds, fsrepo.CacheMount)
opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand All @@ -168,11 +169,20 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
opts.HasBloomFilterSize = 0
}

n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts)
cbs, err := bstore.CachedBlockstore(bs, ctx, opts)
if err != nil {
return err
}

mounts := []bstore.Mount{{fsrepo.CacheMount, cbs}}

if n.Repo.DirectMount(fsrepo.FilestoreMount) != nil {
fs := bstore.NewBlockstoreWPrefix(n.Repo.Datastore(), fsrepo.FilestoreMount)
mounts = append(mounts, bstore.Mount{fsrepo.FilestoreMount, fs})
}

n.Blockstore = bstore.NewMultiBlockstore(mounts...)

rcfg, err := n.Repo.Config()
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ type IpfsNode struct {
PrivateKey ic.PrivKey // the local node's private Key

// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.MultiBlockstore // the block store (lower level)
Blocks *bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root
Expand Down
Loading

0 comments on commit 83447c1

Please sign in to comment.