From c8f95c4817a5b3168de1e632cc5b007068a7eadf Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Mon, 15 Aug 2016 20:29:42 -0400 Subject: [PATCH] Add basic support for multiple blockstores as outlined in #3119. Each datastore is mounted under a different mount point and a multi-blockstore is used to check each mount point for the block. The first mount checked of the multi-blockstore is considered the "cache", all others are considered read-only. This implies that the garbage collector only removes block from the first mount. This change also factors out the pinlock from the blockstore into its own structure. Only the multi-datastore now implements the GCBlockstore interface. In the future this could be separated out from the blockstore completely. For now caching is only done on the first mount, in the future this could be reworked. The bloom filter is the most problematic as the read-only mounts are not necessary immutable and can be changed by methods outside of the blockstore. Right now there is only one mount, but that will soon change once support for the filestore is added. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blockstore/arc_cache_test.go | 2 +- blocks/blockstore/blockstore.go | 55 +++++++----- blocks/blockstore/blockstore_test.go | 4 +- blocks/blockstore/bloom_cache_test.go | 8 +- blocks/blockstore/caching.go | 4 +- blocks/blockstore/multi.go | 122 ++++++++++++++++++++++++++ blockservice/blockservice_test.go | 6 +- core/builder.go | 14 ++- core/core.go | 10 +-- repo/fsrepo/defaultds.go | 39 +++++--- repo/fsrepo/fsrepo.go | 31 ++++++- repo/mock.go | 13 +++ repo/repo.go | 8 ++ unixfs/mod/dagmodifier_test.go | 2 +- 14 files changed, 261 insertions(+), 57 deletions(-) create mode 100644 blocks/blockstore/multi.go diff --git a/blocks/blockstore/arc_cache_test.go b/blocks/blockstore/arc_cache_test.go index 42d388a16a3..32417fe9b80 100644 --- a/blocks/blockstore/arc_cache_test.go +++ b/blocks/blockstore/arc_cache_test.go @@ -13,7 +13,7 @@ import ( var exampleBlock = blocks.NewBlock([]byte("foo")) -func testArcCached(bs GCBlockstore, ctx context.Context) (*arccache, error) { +func testArcCached(bs Blockstore, ctx context.Context) (*arccache, error) { if ctx == nil { ctx = context.TODO() } diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index dfa35ec413a..02bd8704edf 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -21,7 +21,9 @@ import ( var log = logging.Logger("blockstore") // BlockPrefix namespaces blockstore datastores -var BlockPrefix = ds.NewKey("blocks") +const DefaultPrefix = "/blocks" + +var blockPrefix = ds.NewKey(DefaultPrefix) var ValueTypeMismatch = errors.New("the retrieved value is not a Block") var ErrHashMismatch = errors.New("block in storage has different hash than requested") @@ -39,9 +41,7 @@ type Blockstore interface { AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) } -type GCBlockstore interface { - Blockstore - +type GCLocker interface { // GCLock locks the blockstore for garbage collection. No operations // that expect to finish with a pin should ocurr simultaneously. // Reading during GC is safe, and requires no lock. @@ -58,21 +58,32 @@ type GCBlockstore interface { GCRequested() bool } +type GCBlockstore interface { + Blockstore + GCLocker +} + func NewBlockstore(d ds.Batching) *blockstore { + return NewBlockstoreWPrefix(d, "") +} + +func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore { + if prefix == "" { + prefix = DefaultPrefix + } var dsb ds.Batching - dd := dsns.Wrap(d, BlockPrefix) + prefixKey := ds.NewKey(prefix) + dd := dsns.Wrap(d, prefixKey) dsb = dd return &blockstore{ datastore: dsb, + prefix: prefixKey, } } type blockstore struct { datastore ds.Batching - - lk sync.RWMutex - gcreq int32 - gcreqlk sync.Mutex + prefix ds.Key rehash bool } @@ -114,11 +125,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) { func (bs *blockstore) Put(block blocks.Block) error { k := dshelp.NewKeyFromBinary(block.Cid().KeyString()) - // Has is cheaper than Put, so see if we already have it - exists, err := bs.datastore.Has(k) - if err == nil && exists { - return nil // already stored. - } + // Note: The Has Check is now done by the MultiBlockstore + return bs.datastore.Put(k, block.RawData()) } @@ -129,11 +137,6 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error { } for _, b := range blocks { k := dshelp.NewKeyFromBinary(b.Cid().KeyString()) - exists, err := bs.datastore.Has(k) - if err == nil && exists { - continue - } - err = t.Put(k, b.RawData()) if err != nil { return err @@ -159,7 +162,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) // KeysOnly, because that would be _a lot_ of data. q := dsq.Query{KeysOnly: true} // datastore/namespace does *NOT* fix up Query.Prefix - q.Prefix = BlockPrefix.String() + q.Prefix = bs.prefix.String() res, err := bs.datastore.Query(q) if err != nil { return nil, err @@ -224,6 +227,12 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) return output, nil } +type gclocker struct { + lk sync.RWMutex + gcreq int32 + gcreqlk sync.Mutex +} + type Unlocker interface { Unlock() } @@ -237,18 +246,18 @@ func (u *unlocker) Unlock() { u.unlock = nil // ensure its not called twice } -func (bs *blockstore) GCLock() Unlocker { +func (bs *gclocker) GCLock() Unlocker { atomic.AddInt32(&bs.gcreq, 1) bs.lk.Lock() atomic.AddInt32(&bs.gcreq, -1) return &unlocker{bs.lk.Unlock} } -func (bs *blockstore) PinLock() Unlocker { +func (bs *gclocker) PinLock() Unlocker { bs.lk.RLock() return &unlocker{bs.lk.RUnlock} } -func (bs *blockstore) GCRequested() bool { +func (bs *gclocker) GCRequested() bool { return atomic.LoadInt32(&bs.gcreq) > 0 } diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index a5ecefd4417..5690c5553d8 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -170,7 +170,7 @@ func TestAllKeysRespectsContext(t *testing.T) { default: } - e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()} resultChan <- dsq.Result{Entry: e} // let it go. close(resultChan) <-done // should be done now. @@ -190,7 +190,7 @@ func TestValueTypeMismatch(t *testing.T) { block := blocks.NewBlock([]byte("some data")) datastore := ds.NewMapDatastore() - k := BlockPrefix.Child(dshelp.NewKeyFromBinary(block.Cid().KeyString())) + k := blockPrefix.Child(dshelp.NewKeyFromBinary(block.Cid().KeyString())) datastore.Put(k, "data that isn't a block!") blockstore := NewBlockstore(ds_sync.MutexWrap(datastore)) diff --git a/blocks/blockstore/bloom_cache_test.go b/blocks/blockstore/bloom_cache_test.go index 8bdf567f07a..0ee3a557a5c 100644 --- a/blocks/blockstore/bloom_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -14,7 +14,7 @@ import ( syncds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" ) -func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) { +func testBloomCached(bs Blockstore, ctx context.Context) (*bloomcache, error) { if ctx == nil { ctx = context.TODO() } @@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) { block := blocks.NewBlock([]byte("newBlock")) cachedbs.PutMany([]blocks.Block{block}) - if cacheFails != 2 { - t.Fatalf("expected two datastore hits: %d", cacheFails) + if cacheFails != 1 { + t.Fatalf("expected datastore hits: %d", cacheFails) } cachedbs.Put(block) - if cacheFails != 3 { + if cacheFails != 2 { t.Fatalf("expected datastore hit: %d", cacheFails) } diff --git a/blocks/blockstore/caching.go b/blocks/blockstore/caching.go index d28401cf8a4..d19f4782267 100644 --- a/blocks/blockstore/caching.go +++ b/blocks/blockstore/caching.go @@ -22,8 +22,8 @@ func DefaultCacheOpts() CacheOpts { } } -func CachedBlockstore(bs GCBlockstore, - ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) { +func CachedBlockstore(bs Blockstore, + ctx context.Context, opts CacheOpts) (cbs Blockstore, err error) { cbs = bs if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 || diff --git a/blocks/blockstore/multi.go b/blocks/blockstore/multi.go new file mode 100644 index 00000000000..33c3b3173f5 --- /dev/null +++ b/blocks/blockstore/multi.go @@ -0,0 +1,122 @@ +package blockstore + +// A very simple multi-blockstore that analogous to a unionfs Put and +// DeleteBlock only go to the first blockstore all others are +// considered readonly. + +import ( + //"errors" + "context" + + blocks "github.com/ipfs/go-ipfs/blocks" + cid "gx/ipfs/QmXUuRadqDq5BuFWzVU6VuKaSjTcNm1gNCtLvvP1TJCW4z/go-cid" +) + +type MultiBlockstore interface { + Blockstore + GCLocker + FirstMount() Blockstore + Mounts() []string + Mount(prefix string) Blockstore +} + +type Mount struct { + Prefix string + Blocks Blockstore +} + +func NewMultiBlockstore(mounts ...Mount) *multiblockstore { + return &multiblockstore{ + mounts: mounts, + } +} + +type multiblockstore struct { + mounts []Mount + gclocker +} + +func (bs *multiblockstore) FirstMount() Blockstore { + return bs.mounts[0].Blocks +} + +func (bs *multiblockstore) Mounts() []string { + mounts := make([]string, 0, len(bs.mounts)) + for _, mnt := range bs.mounts { + mounts = append(mounts, mnt.Prefix) + } + return mounts +} + +func (bs *multiblockstore) Mount(prefix string) Blockstore { + for _, m := range bs.mounts { + if m.Prefix == prefix { + return m.Blocks + } + } + return nil +} + +func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error { + return bs.mounts[0].Blocks.DeleteBlock(key) +} + +func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) { + var firstErr error + for _, m := range bs.mounts { + have, err := m.Blocks.Has(c) + if have && err == nil { + return have, nil + } + if err != nil && firstErr == nil { + firstErr = err + } + } + return false, firstErr +} + +func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) { + var firstErr error + for _, m := range bs.mounts { + blk, err := m.Blocks.Get(c) + if err == nil { + return blk, nil + } + if firstErr == nil || firstErr == ErrNotFound { + firstErr = err + } + } + return nil, firstErr +} + +func (bs *multiblockstore) Put(blk blocks.Block) error { + // First call Has() to make sure the block doesn't exist in any of + // the sub-blockstores, otherwise we could end with data being + // duplicated in two blockstores. + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + return nil // already stored + } + return bs.mounts[0].Blocks.Put(blk) +} + +func (bs *multiblockstore) PutMany(blks []blocks.Block) error { + stilladd := make([]blocks.Block, 0, len(blks)) + // Has is cheaper than Put, so if we already have it then skip + for _, blk := range blks { + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + continue // already stored + } + stilladd = append(stilladd, blk) + } + if len(stilladd) == 0 { + return nil + } + return bs.mounts[0].Blocks.PutMany(stilladd) +} + +func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { + return bs.mounts[0].Blocks.AllKeysChan(ctx) + //return nil, errors.New("Unimplemented") +} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index d87a383e566..0415f8213d2 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -36,14 +36,14 @@ func TestWriteThroughWorks(t *testing.T) { } } -var _ blockstore.GCBlockstore = (*PutCountingBlockstore)(nil) +var _ blockstore.Blockstore = (*PutCountingBlockstore)(nil) type PutCountingBlockstore struct { - blockstore.GCBlockstore + blockstore.Blockstore PutCounter int } func (bs *PutCountingBlockstore) Put(block blocks.Block) error { bs.PutCounter++ - return bs.GCBlockstore.Put(block) + return bs.Blockstore.Put(block) } diff --git a/core/builder.go b/core/builder.go index 4ddde171d95..329d3a42267 100644 --- a/core/builder.go +++ b/core/builder.go @@ -16,6 +16,7 @@ import ( pin "github.com/ipfs/go-ipfs/pin" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" + fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" context "context" retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore" @@ -167,7 +168,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } var err error - bs := bstore.NewBlockstore(rds) + bs := bstore.NewBlockstoreWPrefix(rds, fsrepo.CacheMount) opts := bstore.DefaultCacheOpts() conf, err := n.Repo.Config() if err != nil { @@ -179,11 +180,20 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { opts.HasBloomFilterSize = 0 } - n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts) + cbs, err := bstore.CachedBlockstore(bs, ctx, opts) if err != nil { return err } + mounts := []bstore.Mount{{fsrepo.CacheMount, cbs}} + + if n.Repo.DirectMount(fsrepo.FilestoreMount) != nil { + fs := bstore.NewBlockstoreWPrefix(n.Repo.Datastore(), fsrepo.FilestoreMount) + mounts = append(mounts, bstore.Mount{fsrepo.FilestoreMount, fs}) + } + + n.Blockstore = bstore.NewMultiBlockstore(mounts...) + rcfg, err := n.Repo.Config() if err != nil { return err diff --git a/core/core.go b/core/core.go index 1f4e59bbadd..2e40daf13cb 100644 --- a/core/core.go +++ b/core/core.go @@ -94,11 +94,11 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - Resolver *path.Resolver // the path resolution system + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.MultiBlockstore // the block store (lower level) + Blocks bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + Resolver *path.Resolver // the path resolution system Reporter metrics.Reporter Discovery discovery.Service FilesRoot *mfs.Root diff --git a/repo/fsrepo/defaultds.go b/repo/fsrepo/defaultds.go index ed8fbafe702..c6691b77263 100644 --- a/repo/fsrepo/defaultds.go +++ b/repo/fsrepo/defaultds.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-ipfs/thirdparty/dir" "gx/ipfs/QmU4VzzKNLJXJ72SedXBQKyf5Jo8W89iWpbWQjHn9qef8N/go-ds-flatfs" levelds "gx/ipfs/QmUHmMGmcwCrjHQHcYhBnqGCSWs5pBSMbGZmfwavETR1gg/go-ds-leveldb" + //multi "github.com/ipfs/go-ipfs/repo/multi" ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" mount "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/syncmount" @@ -20,7 +21,13 @@ const ( flatfsDirectory = "blocks" ) -func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { +const ( + RootMount = "/" + CacheMount = "/blocks" // needs to be the same as blockstore.DefaultPrefix + FilestoreMount = "/filestore" +) + +func openDefaultDatastore(r *FSRepo) (repo.Datastore, []Mount, error) { leveldbPath := path.Join(r.path, leveldbDirectory) // save leveldb reference so it can be neatly closed afterward @@ -28,7 +35,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { Compression: ldbopts.NoCompression, }) if err != nil { - return nil, fmt.Errorf("unable to open leveldb datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open leveldb datastore: %v", err) } syncfs := !r.config.Datastore.NoSync @@ -36,7 +43,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { // by the Qm prefix. Leaving us with 9 bits, or 512 way sharding blocksDS, err := flatfs.New(path.Join(r.path, flatfsDirectory), 5, syncfs) if err != nil { - return nil, fmt.Errorf("unable to open flatfs datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open flatfs datastore: %v", err) } // Add our PeerID to metrics paths to keep them unique @@ -51,18 +58,24 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { prefix := "fsrepo." + id + ".datastore." metricsBlocks := measure.New(prefix+"blocks", blocksDS) metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS) - mountDS := mount.New([]mount.Mount{ - { - Prefix: ds.NewKey("/blocks"), - Datastore: metricsBlocks, - }, - { - Prefix: ds.NewKey("/"), - Datastore: metricsLevelDB, - }, + + var mounts []mount.Mount + var directMounts []Mount + + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(CacheMount), + Datastore: metricsBlocks, + }) + directMounts = append(directMounts, Mount{CacheMount, blocksDS}) + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(RootMount), + Datastore: metricsLevelDB, }) + directMounts = append(directMounts, Mount{RootMount, leveldbDS}) + + mountDS := mount.New(mounts) - return mountDS, nil + return mountDS, directMounts, nil } func initDefaultDatastore(repoPath string, conf *config.Config) error { diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 3b20a51283f..a736fb90192 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -20,6 +20,7 @@ import ( dir "github.com/ipfs/go-ipfs/thirdparty/dir" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" util "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" "gx/ipfs/QmeqtHtxGfcsfXiou7wqHJARWPKUTUcPdtSfSYYHp48dtQ/go-ds-measure" ) @@ -93,6 +94,12 @@ type FSRepo struct { lockfile io.Closer config *config.Config ds repo.Datastore + mounts []Mount +} + +type Mount struct { + prefix string + dstore ds.Datastore } var _ repo.Repo = (*FSRepo)(nil) @@ -331,11 +338,12 @@ func (r *FSRepo) openConfig() error { func (r *FSRepo) openDatastore() error { switch r.config.Datastore.Type { case "default", "leveldb", "": - d, err := openDefaultDatastore(r) + d, m, err := openDefaultDatastore(r) if err != nil { return err } r.ds = d + r.mounts = m default: return fmt.Errorf("unknown datastore type: %s", r.config.Datastore.Type) } @@ -557,6 +565,27 @@ func (r *FSRepo) Datastore() repo.Datastore { return d } +func (r *FSRepo) DirectMount(prefix string) ds.Datastore { + packageLock.Lock() + defer packageLock.Unlock() + for _, m := range r.mounts { + if prefix == m.prefix { + return m.dstore + } + } + return nil +} + +func (r *FSRepo) Mounts() []string { + packageLock.Lock() + mounts := make([]string, 0, len(r.mounts)) + for _, m := range r.mounts { + mounts = append(mounts, m.prefix) + } + packageLock.Unlock() + return mounts +} + // GetStorageUsage computes the storage space taken by the repo in bytes func (r *FSRepo) GetStorageUsage() (uint64, error) { pth, err := config.PathRoot() diff --git a/repo/mock.go b/repo/mock.go index 8190a0bda1b..f68e078cfbc 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/ipfs/go-ipfs/repo/config" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" ) var errTODO = errors.New("TODO: mock repo") @@ -33,6 +34,18 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) { func (m *Mock) Datastore() Datastore { return m.D } +func (m *Mock) DirectMount(prefix string) ds.Datastore { + if prefix == "/" { + return m.D + } else { + return nil + } +} + +func (m *Mock) Mounts() []string { + return []string{"/"} +} + func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil } func (m *Mock) Close() error { return errTODO } diff --git a/repo/repo.go b/repo/repo.go index d95af0446dd..633ff57114b 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -22,6 +22,14 @@ type Repo interface { Datastore() Datastore GetStorageUsage() (uint64, error) + // DirectMount provides direct access to a datastore mounted + // under prefix in order to perform low-level operations. The + // datastore returned is guaranteed not be a proxy (such as a + // go-datastore/measure) normal operations should go through + // Datastore() + DirectMount(prefix string) ds.Datastore + Mounts() []string + // SetAPIAddr sets the API address in the repo. SetAPIAddr(addr string) error diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index 810ec6f2344..9c7ac89d7be 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -22,7 +22,7 @@ import ( "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/sync" ) -func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore) { +func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blockstore) { dstore := ds.NewMapDatastore() tsds := sync.MutexWrap(dstore) bstore := blockstore.NewBlockstore(tsds)