Skip to content

Commit

Permalink
Add basic support for multiple blockstores as outlined in #3119.
Browse files Browse the repository at this point in the history
Each datastore is mounted under a different mount point and a
multi-blockstore is used to check each mount point for the block.

The first mount checked of the multi-blockstore is considered the
"cache", all others are considered read-only.  This implies that the
garbage collector only removes block from the first mount.

This change also factors out the pinlock from the blockstore into its
own structure.  Only the multi-datastore now implements the
GCBlockstore interface.  In the future this could be separated out
from the blockstore completely.

For now caching is only done on the first mount, in the future this
could be reworked.  The bloom filter is the most problematic as the
read-only mounts are not necessary immutable and can be changed by
methods outside of the blockstore.

Right now there is only one mount, but that will soon change once
support for the filestore is added.

License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed Nov 3, 2016
1 parent 1ca2d42 commit 3b752eb
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 43 deletions.
34 changes: 17 additions & 17 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
var log = logging.Logger("blockstore")

// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks")
const DefaultPrefix = "/blocks"

var blockPrefix = ds.NewKey(DefaultPrefix)

var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
Expand Down Expand Up @@ -71,20 +73,26 @@ type gcBlockstore struct {
}

func NewBlockstore(d ds.Batching) *blockstore {
return NewBlockstoreWPrefix(d, "")
}

func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore {
if prefix == "" {
prefix = DefaultPrefix
}
var dsb ds.Batching
dd := dsns.Wrap(d, BlockPrefix)
prefixKey := ds.NewKey(prefix)
dd := dsns.Wrap(d, prefixKey)
dsb = dd
return &blockstore{
datastore: dsb,
prefix: prefixKey,
}
}

type blockstore struct {
datastore ds.Batching

lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
prefix ds.Key

rehash bool
}
Expand Down Expand Up @@ -130,11 +138,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
func (bs *blockstore) Put(block blocks.Block) error {
k := dshelp.CidToDsKey(block.Cid())

// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
}
// Note: The Has Check is now done by the MultiBlockstore

return bs.datastore.Put(k, block.RawData())
}

Expand All @@ -145,11 +150,6 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
}
for _, b := range blocks {
k := dshelp.CidToDsKey(b.Cid())
exists, err := bs.datastore.Has(k)
if err == nil && exists {
continue
}

err = t.Put(k, b.RawData())
if err != nil {
return err
Expand All @@ -175,7 +175,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
// datastore/namespace does *NOT* fix up Query.Prefix
q.Prefix = BlockPrefix.String()
q.Prefix = bs.prefix.String()
res, err := bs.datastore.Query(q)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestAllKeysRespectsContext(t *testing.T) {
default:
}

e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
Expand All @@ -190,7 +190,7 @@ func TestValueTypeMismatch(t *testing.T) {
block := blocks.NewBlock([]byte("some data"))

datastore := ds.NewMapDatastore()
k := BlockPrefix.Child(dshelp.CidToDsKey(block.Cid()))
k := blockPrefix.Child(dshelp.CidToDsKey(block.Cid()))
datastore.Put(k, "data that isn't a block!")

blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
Expand Down
6 changes: 3 additions & 3 deletions blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) {
block := blocks.NewBlock([]byte("newBlock"))

cachedbs.PutMany([]blocks.Block{block})
if cacheFails != 2 {
t.Fatalf("expected two datastore hits: %d", cacheFails)
if cacheFails != 1 {
t.Fatalf("expected datastore hits: %d", cacheFails)
}
cachedbs.Put(block)
if cacheFails != 3 {
if cacheFails != 2 {
t.Fatalf("expected datastore hit: %d", cacheFails)
}

Expand Down
122 changes: 122 additions & 0 deletions blocks/blockstore/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package blockstore

// A very simple multi-blockstore that analogous to a unionfs Put and
// DeleteBlock only go to the first blockstore all others are
// considered readonly.

import (
//"errors"
"context"

blocks "github.com/ipfs/go-ipfs/blocks"
cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid"
)

type MultiBlockstore interface {
Blockstore
GCLocker
FirstMount() Blockstore
Mounts() []string
Mount(prefix string) Blockstore
}

type Mount struct {
Prefix string
Blocks Blockstore
}

func NewMultiBlockstore(mounts ...Mount) *multiblockstore {
return &multiblockstore{
mounts: mounts,
}
}

type multiblockstore struct {
mounts []Mount
gclocker
}

func (bs *multiblockstore) FirstMount() Blockstore {
return bs.mounts[0].Blocks
}

func (bs *multiblockstore) Mounts() []string {
mounts := make([]string, 0, len(bs.mounts))
for _, mnt := range bs.mounts {
mounts = append(mounts, mnt.Prefix)
}
return mounts
}

func (bs *multiblockstore) Mount(prefix string) Blockstore {
for _, m := range bs.mounts {
if m.Prefix == prefix {
return m.Blocks
}
}
return nil
}

func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error {
return bs.mounts[0].Blocks.DeleteBlock(key)
}

func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) {
var firstErr error
for _, m := range bs.mounts {
have, err := m.Blocks.Has(c)
if have && err == nil {
return have, nil
}
if err != nil && firstErr == nil {
firstErr = err
}
}
return false, firstErr
}

func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) {
var firstErr error
for _, m := range bs.mounts {
blk, err := m.Blocks.Get(c)
if err == nil {
return blk, nil
}
if firstErr == nil || firstErr == ErrNotFound {
firstErr = err
}
}
return nil, firstErr
}

func (bs *multiblockstore) Put(blk blocks.Block) error {
// First call Has() to make sure the block doesn't exist in any of
// the sub-blockstores, otherwise we could end with data being
// duplicated in two blockstores.
exists, err := bs.Has(blk.Cid())
if err == nil && exists {
return nil // already stored
}
return bs.mounts[0].Blocks.Put(blk)
}

func (bs *multiblockstore) PutMany(blks []blocks.Block) error {
stilladd := make([]blocks.Block, 0, len(blks))
// Has is cheaper than Put, so if we already have it then skip
for _, blk := range blks {
exists, err := bs.Has(blk.Cid())
if err == nil && exists {
continue // already stored
}
stilladd = append(stilladd, blk)
}
if len(stilladd) == 0 {
return nil
}
return bs.mounts[0].Blocks.PutMany(stilladd)
}

func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
return bs.mounts[0].Blocks.AllKeysChan(ctx)
//return nil, errors.New("Unimplemented")
}
7 changes: 5 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"

context "context"
retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore"
Expand Down Expand Up @@ -167,7 +168,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}

var err error
bs := bstore.NewBlockstore(rds)
bs := bstore.NewBlockstoreWPrefix(rds, fsrepo.CacheMount)
opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand All @@ -184,7 +185,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}

n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker())
mounts := []bstore.Mount{{fsrepo.CacheMount, cbs}}

n.Blockstore = bstore.NewMultiBlockstore(mounts...)

rcfg, err := n.Repo.Config()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ type IpfsNode struct {
PrivateKey ic.PrivKey // the local node's private Key

// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.MultiBlockstore // the block store (lower level)
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root
Expand Down
39 changes: 26 additions & 13 deletions repo/fsrepo/defaultds.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-ipfs/thirdparty/dir"
"gx/ipfs/QmU4VzzKNLJXJ72SedXBQKyf5Jo8W89iWpbWQjHn9qef8N/go-ds-flatfs"
levelds "gx/ipfs/QmUHmMGmcwCrjHQHcYhBnqGCSWs5pBSMbGZmfwavETR1gg/go-ds-leveldb"
//multi "github.com/ipfs/go-ipfs/repo/multi"
ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt"
ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore"
mount "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/syncmount"
Expand All @@ -20,23 +21,29 @@ const (
flatfsDirectory = "blocks"
)

func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) {
const (
RootMount = "/"
CacheMount = "/blocks" // needs to be the same as blockstore.DefaultPrefix
FilestoreMount = "/filestore"
)

func openDefaultDatastore(r *FSRepo) (repo.Datastore, []Mount, error) {
leveldbPath := path.Join(r.path, leveldbDirectory)

// save leveldb reference so it can be neatly closed afterward
leveldbDS, err := levelds.NewDatastore(leveldbPath, &levelds.Options{
Compression: ldbopts.NoCompression,
})
if err != nil {
return nil, fmt.Errorf("unable to open leveldb datastore: %v", err)
return nil, nil, fmt.Errorf("unable to open leveldb datastore: %v", err)
}

syncfs := !r.config.Datastore.NoSync
// 5 bytes of prefix gives us 25 bits of freedom, 16 of which are taken by
// by the Qm prefix. Leaving us with 9 bits, or 512 way sharding
blocksDS, err := flatfs.New(path.Join(r.path, flatfsDirectory), 5, syncfs)
if err != nil {
return nil, fmt.Errorf("unable to open flatfs datastore: %v", err)
return nil, nil, fmt.Errorf("unable to open flatfs datastore: %v", err)
}

// Add our PeerID to metrics paths to keep them unique
Expand All @@ -51,18 +58,24 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) {
prefix := "fsrepo." + id + ".datastore."
metricsBlocks := measure.New(prefix+"blocks", blocksDS)
metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS)
mountDS := mount.New([]mount.Mount{
{
Prefix: ds.NewKey("/blocks"),
Datastore: metricsBlocks,
},
{
Prefix: ds.NewKey("/"),
Datastore: metricsLevelDB,
},

var mounts []mount.Mount
var directMounts []Mount

mounts = append(mounts, mount.Mount{
Prefix: ds.NewKey(CacheMount),
Datastore: metricsBlocks,
})
directMounts = append(directMounts, Mount{CacheMount, blocksDS})
mounts = append(mounts, mount.Mount{
Prefix: ds.NewKey(RootMount),
Datastore: metricsLevelDB,
})
directMounts = append(directMounts, Mount{RootMount, leveldbDS})

mountDS := mount.New(mounts)

return mountDS, nil
return mountDS, directMounts, nil
}

func initDefaultDatastore(repoPath string, conf *config.Config) error {
Expand Down
Loading

0 comments on commit 3b752eb

Please sign in to comment.