Skip to content

Commit

Permalink
Add basic support for multiple blockstores as outlined in ipfs#3119.
Browse files Browse the repository at this point in the history
Each datastore is mounted under a different mount point and a
multi-blockstore is used to check each mount point for the block.

The first mount checked of the multi-blockstore is considered the
"cache", all others are considered read-only.  This implies that the
garbage collector only removes block from the first mount.

This change also factors out the pinlock from the blockstore into its
own structure.  Only the multi-datastore now implements the
GCBlockstore interface.  In the future this could be separated out
from the blockstore completely.

For now caching is only done on the first mount, in the future this
could be reworked.  The bloom filter is the most problematic as the
read-only mounts are not necessary immutable and can be changed by
methods outside of the blockstore.

Right now there is only one mount, but that will soon change once
support for the filestore is added.

License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed Oct 15, 2016
1 parent d5c716a commit c8f95c4
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 57 deletions.
2 changes: 1 addition & 1 deletion blocks/blockstore/arc_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

var exampleBlock = blocks.NewBlock([]byte("foo"))

func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) {
func testArcCached(bs Blockstore, ctx context.Context) (*arccache, error) {
if ctx == nil {
ctx = context.TODO()
}
Expand Down
55 changes: 32 additions & 23 deletions blocks/blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
var log = logging.Logger("blockstore")

// BlockPrefix namespaces blockstore datastores
var BlockPrefix = ds.NewKey("blocks")
const DefaultPrefix = "/blocks"

var blockPrefix = ds.NewKey(DefaultPrefix)

var ValueTypeMismatch = errors.New("the retrieved value is not a Block")
var ErrHashMismatch = errors.New("block in storage has different hash than requested")
Expand All @@ -39,9 +41,7 @@ type Blockstore interface {
AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
}

type GCBlockstore interface {
Blockstore

type GCLocker interface {
// GCLock locks the blockstore for garbage collection. No operations
// that expect to finish with a pin should ocurr simultaneously.
// Reading during GC is safe, and requires no lock.
Expand All @@ -58,21 +58,32 @@ type GCBlockstore interface {
GCRequested() bool
}

type GCBlockstore interface {
Blockstore
GCLocker
}

func NewBlockstore(d ds.Batching) *blockstore {
return NewBlockstoreWPrefix(d, "")
}

func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore {
if prefix == "" {
prefix = DefaultPrefix
}
var dsb ds.Batching
dd := dsns.Wrap(d, BlockPrefix)
prefixKey := ds.NewKey(prefix)
dd := dsns.Wrap(d, prefixKey)
dsb = dd
return &blockstore{
datastore: dsb,
prefix: prefixKey,
}
}

type blockstore struct {
datastore ds.Batching

lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
prefix ds.Key

rehash bool
}
Expand Down Expand Up @@ -114,11 +125,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) {
func (bs *blockstore) Put(block blocks.Block) error {
k := dshelp.NewKeyFromBinary(block.Cid().KeyString())

// Has is cheaper than Put, so see if we already have it
exists, err := bs.datastore.Has(k)
if err == nil && exists {
return nil // already stored.
}
// Note: The Has Check is now done by the MultiBlockstore

return bs.datastore.Put(k, block.RawData())
}

Expand All @@ -129,11 +137,6 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error {
}
for _, b := range blocks {
k := dshelp.NewKeyFromBinary(b.Cid().KeyString())
exists, err := bs.datastore.Has(k)
if err == nil && exists {
continue
}

err = t.Put(k, b.RawData())
if err != nil {
return err
Expand All @@ -159,7 +162,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
// datastore/namespace does *NOT* fix up Query.Prefix
q.Prefix = BlockPrefix.String()
q.Prefix = bs.prefix.String()
res, err := bs.datastore.Query(q)
if err != nil {
return nil, err
Expand Down Expand Up @@ -224,6 +227,12 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error)
return output, nil
}

type gclocker struct {
lk sync.RWMutex
gcreq int32
gcreqlk sync.Mutex
}

type Unlocker interface {
Unlock()
}
Expand All @@ -237,18 +246,18 @@ func (u *unlocker) Unlock() {
u.unlock = nil // ensure its not called twice
}

func (bs *blockstore) GCLock() Unlocker {
func (bs *gclocker) GCLock() Unlocker {
atomic.AddInt32(&bs.gcreq, 1)
bs.lk.Lock()
atomic.AddInt32(&bs.gcreq, -1)
return &unlocker{bs.lk.Unlock}
}

func (bs *blockstore) PinLock() Unlocker {
func (bs *gclocker) PinLock() Unlocker {
bs.lk.RLock()
return &unlocker{bs.lk.RUnlock}
}

func (bs *blockstore) GCRequested() bool {
func (bs *gclocker) GCRequested() bool {
return atomic.LoadInt32(&bs.gcreq) > 0
}
4 changes: 2 additions & 2 deletions blocks/blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestAllKeysRespectsContext(t *testing.T) {
default:
}

e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()}
e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()}
resultChan <- dsq.Result{Entry: e} // let it go.
close(resultChan)
<-done // should be done now.
Expand All @@ -190,7 +190,7 @@ func TestValueTypeMismatch(t *testing.T) {
block := blocks.NewBlock([]byte("some data"))

datastore := ds.NewMapDatastore()
k := BlockPrefix.Child(dshelp.NewKeyFromBinary(block.Cid().KeyString()))
k := blockPrefix.Child(dshelp.NewKeyFromBinary(block.Cid().KeyString()))
datastore.Put(k, "data that isn't a block!")

blockstore := NewBlockstore(ds_sync.MutexWrap(datastore))
Expand Down
8 changes: 4 additions & 4 deletions blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync"
)

func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
func testBloomCached(bs Blockstore, ctx context.Context) (*bloomcache, error) {
if ctx == nil {
ctx = context.TODO()
}
Expand Down Expand Up @@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) {
block := blocks.NewBlock([]byte("newBlock"))

cachedbs.PutMany([]blocks.Block{block})
if cacheFails != 2 {
t.Fatalf("expected two datastore hits: %d", cacheFails)
if cacheFails != 1 {
t.Fatalf("expected datastore hits: %d", cacheFails)
}
cachedbs.Put(block)
if cacheFails != 3 {
if cacheFails != 2 {
t.Fatalf("expected datastore hit: %d", cacheFails)
}

Expand Down
4 changes: 2 additions & 2 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func DefaultCacheOpts() CacheOpts {
}
}

func CachedBlockstore(bs GCBlockstore,
ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) {
func CachedBlockstore(bs Blockstore,
ctx context.Context, opts CacheOpts) (cbs Blockstore, err error) {
cbs = bs

if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
Expand Down
122 changes: 122 additions & 0 deletions blocks/blockstore/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package blockstore

// A very simple multi-blockstore that analogous to a unionfs Put and
// DeleteBlock only go to the first blockstore all others are
// considered readonly.

import (
//"errors"
"context"

blocks "github.com/ipfs/go-ipfs/blocks"
cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid"
)

type MultiBlockstore interface {
Blockstore
GCLocker
FirstMount() Blockstore
Mounts() []string
Mount(prefix string) Blockstore
}

type Mount struct {
Prefix string
Blocks Blockstore
}

func NewMultiBlockstore(mounts ...Mount) *multiblockstore {
return &multiblockstore{
mounts: mounts,
}
}

type multiblockstore struct {
mounts []Mount
gclocker
}

func (bs *multiblockstore) FirstMount() Blockstore {
return bs.mounts[0].Blocks
}

func (bs *multiblockstore) Mounts() []string {
mounts := make([]string, 0, len(bs.mounts))
for _, mnt := range bs.mounts {
mounts = append(mounts, mnt.Prefix)
}
return mounts
}

func (bs *multiblockstore) Mount(prefix string) Blockstore {
for _, m := range bs.mounts {
if m.Prefix == prefix {
return m.Blocks
}
}
return nil
}

func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error {
return bs.mounts[0].Blocks.DeleteBlock(key)
}

func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) {
var firstErr error
for _, m := range bs.mounts {
have, err := m.Blocks.Has(c)
if have && err == nil {
return have, nil
}
if err != nil && firstErr == nil {
firstErr = err
}
}
return false, firstErr
}

func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) {
var firstErr error
for _, m := range bs.mounts {
blk, err := m.Blocks.Get(c)
if err == nil {
return blk, nil
}
if firstErr == nil || firstErr == ErrNotFound {
firstErr = err
}
}
return nil, firstErr
}

func (bs *multiblockstore) Put(blk blocks.Block) error {
// First call Has() to make sure the block doesn't exist in any of
// the sub-blockstores, otherwise we could end with data being
// duplicated in two blockstores.
exists, err := bs.Has(blk.Cid())
if err == nil && exists {
return nil // already stored
}
return bs.mounts[0].Blocks.Put(blk)
}

func (bs *multiblockstore) PutMany(blks []blocks.Block) error {
stilladd := make([]blocks.Block, 0, len(blks))
// Has is cheaper than Put, so if we already have it then skip
for _, blk := range blks {
exists, err := bs.Has(blk.Cid())
if err == nil && exists {
continue // already stored
}
stilladd = append(stilladd, blk)
}
if len(stilladd) == 0 {
return nil
}
return bs.mounts[0].Blocks.PutMany(stilladd)
}

func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) {
return bs.mounts[0].Blocks.AllKeysChan(ctx)
//return nil, errors.New("Unimplemented")
}
6 changes: 3 additions & 3 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ func TestWriteThroughWorks(t *testing.T) {
}
}

var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil)
var _ blockstore.Blockstore = (*PutCountingBlockstore)(nil)

type PutCountingBlockstore struct {
blockstore.GCBlockstore
blockstore.Blockstore
PutCounter int
}

func (bs *PutCountingBlockstore) Put(block blocks.Block) error {
bs.PutCounter++
return bs.GCBlockstore.Put(block)
return bs.Blockstore.Put(block)
}
14 changes: 12 additions & 2 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"

context "context"
retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore"
Expand Down Expand Up @@ -167,7 +168,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
}

var err error
bs := bstore.NewBlockstore(rds)
bs := bstore.NewBlockstoreWPrefix(rds, fsrepo.CacheMount)
opts := bstore.DefaultCacheOpts()
conf, err := n.Repo.Config()
if err != nil {
Expand All @@ -179,11 +180,20 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
opts.HasBloomFilterSize = 0
}

n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts)
cbs, err := bstore.CachedBlockstore(bs, ctx, opts)
if err != nil {
return err
}

mounts := []bstore.Mount{{fsrepo.CacheMount, cbs}}

if n.Repo.DirectMount(fsrepo.FilestoreMount) != nil {
fs := bstore.NewBlockstoreWPrefix(n.Repo.Datastore(), fsrepo.FilestoreMount)
mounts = append(mounts, bstore.Mount{fsrepo.FilestoreMount, fs})
}

n.Blockstore = bstore.NewMultiBlockstore(mounts...)

rcfg, err := n.Repo.Config()
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ type IpfsNode struct {
PrivateKey ic.PrivKey // the local node's private Key

// Services
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.GCBlockstore // the block store (lower level)
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Peerstore pstore.Peerstore // storage for other Peer instances
Blockstore bstore.MultiBlockstore // the block store (lower level)
Blocks bserv.BlockService // the block service, get/add blocks.
DAG merkledag.DAGService // the merkle dag service, get/add objects.
Resolver *path.Resolver // the path resolution system
Reporter metrics.Reporter
Discovery discovery.Service
FilesRoot *mfs.Root
Expand Down
Loading

0 comments on commit c8f95c4

Please sign in to comment.