From 3b752ebb998eccbe4288a777f19f3516db96aac6 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Mon, 15 Aug 2016 20:29:42 -0400 Subject: [PATCH 1/7] Add basic support for multiple blockstores as outlined in #3119. Each datastore is mounted under a different mount point and a multi-blockstore is used to check each mount point for the block. The first mount checked of the multi-blockstore is considered the "cache", all others are considered read-only. This implies that the garbage collector only removes block from the first mount. This change also factors out the pinlock from the blockstore into its own structure. Only the multi-datastore now implements the GCBlockstore interface. In the future this could be separated out from the blockstore completely. For now caching is only done on the first mount, in the future this could be reworked. The bloom filter is the most problematic as the read-only mounts are not necessary immutable and can be changed by methods outside of the blockstore. Right now there is only one mount, but that will soon change once support for the filestore is added. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blockstore/blockstore.go | 34 +++---- blocks/blockstore/blockstore_test.go | 4 +- blocks/blockstore/bloom_cache_test.go | 6 +- blocks/blockstore/multi.go | 122 ++++++++++++++++++++++++++ core/builder.go | 7 +- core/core.go | 10 +-- repo/fsrepo/defaultds.go | 39 +++++--- repo/fsrepo/fsrepo.go | 31 ++++++- repo/mock.go | 13 +++ repo/repo.go | 8 ++ 10 files changed, 231 insertions(+), 43 deletions(-) create mode 100644 blocks/blockstore/multi.go diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 274c1ee7b4f..4c53d9e4154 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -21,7 +21,9 @@ import ( var log = logging.Logger("blockstore") // BlockPrefix namespaces blockstore datastores -var BlockPrefix = ds.NewKey("blocks") +const DefaultPrefix = "/blocks" + +var blockPrefix = ds.NewKey(DefaultPrefix) var ValueTypeMismatch = errors.New("the retrieved value is not a Block") var ErrHashMismatch = errors.New("block in storage has different hash than requested") @@ -71,20 +73,26 @@ type gcBlockstore struct { } func NewBlockstore(d ds.Batching) *blockstore { + return NewBlockstoreWPrefix(d, "") +} + +func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore { + if prefix == "" { + prefix = DefaultPrefix + } var dsb ds.Batching - dd := dsns.Wrap(d, BlockPrefix) + prefixKey := ds.NewKey(prefix) + dd := dsns.Wrap(d, prefixKey) dsb = dd return &blockstore{ datastore: dsb, + prefix: prefixKey, } } type blockstore struct { datastore ds.Batching - - lk sync.RWMutex - gcreq int32 - gcreqlk sync.Mutex + prefix ds.Key rehash bool } @@ -130,11 +138,8 @@ func (bs *blockstore) Get(k *cid.Cid) (blocks.Block, error) { func (bs *blockstore) Put(block blocks.Block) error { k := dshelp.CidToDsKey(block.Cid()) - // Has is cheaper than Put, so see if we already have it - exists, err := bs.datastore.Has(k) - if err == nil && exists { - return nil // already stored. - } + // Note: The Has Check is now done by the MultiBlockstore + return bs.datastore.Put(k, block.RawData()) } @@ -145,11 +150,6 @@ func (bs *blockstore) PutMany(blocks []blocks.Block) error { } for _, b := range blocks { k := dshelp.CidToDsKey(b.Cid()) - exists, err := bs.datastore.Has(k) - if err == nil && exists { - continue - } - err = t.Put(k, b.RawData()) if err != nil { return err @@ -175,7 +175,7 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) // KeysOnly, because that would be _a lot_ of data. q := dsq.Query{KeysOnly: true} // datastore/namespace does *NOT* fix up Query.Prefix - q.Prefix = BlockPrefix.String() + q.Prefix = bs.prefix.String() res, err := bs.datastore.Query(q) if err != nil { return nil, err diff --git a/blocks/blockstore/blockstore_test.go b/blocks/blockstore/blockstore_test.go index abe8a1a72d5..22c15d0004b 100644 --- a/blocks/blockstore/blockstore_test.go +++ b/blocks/blockstore/blockstore_test.go @@ -170,7 +170,7 @@ func TestAllKeysRespectsContext(t *testing.T) { default: } - e := dsq.Entry{Key: BlockPrefix.ChildString("foo").String()} + e := dsq.Entry{Key: blockPrefix.ChildString("foo").String()} resultChan <- dsq.Result{Entry: e} // let it go. close(resultChan) <-done // should be done now. @@ -190,7 +190,7 @@ func TestValueTypeMismatch(t *testing.T) { block := blocks.NewBlock([]byte("some data")) datastore := ds.NewMapDatastore() - k := BlockPrefix.Child(dshelp.CidToDsKey(block.Cid())) + k := blockPrefix.Child(dshelp.CidToDsKey(block.Cid())) datastore.Put(k, "data that isn't a block!") blockstore := NewBlockstore(ds_sync.MutexWrap(datastore)) diff --git a/blocks/blockstore/bloom_cache_test.go b/blocks/blockstore/bloom_cache_test.go index 72223cd44e0..0ee3a557a5c 100644 --- a/blocks/blockstore/bloom_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) { block := blocks.NewBlock([]byte("newBlock")) cachedbs.PutMany([]blocks.Block{block}) - if cacheFails != 2 { - t.Fatalf("expected two datastore hits: %d", cacheFails) + if cacheFails != 1 { + t.Fatalf("expected datastore hits: %d", cacheFails) } cachedbs.Put(block) - if cacheFails != 3 { + if cacheFails != 2 { t.Fatalf("expected datastore hit: %d", cacheFails) } diff --git a/blocks/blockstore/multi.go b/blocks/blockstore/multi.go new file mode 100644 index 00000000000..77d5ea61347 --- /dev/null +++ b/blocks/blockstore/multi.go @@ -0,0 +1,122 @@ +package blockstore + +// A very simple multi-blockstore that analogous to a unionfs Put and +// DeleteBlock only go to the first blockstore all others are +// considered readonly. + +import ( + //"errors" + "context" + + blocks "github.com/ipfs/go-ipfs/blocks" + cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" +) + +type MultiBlockstore interface { + Blockstore + GCLocker + FirstMount() Blockstore + Mounts() []string + Mount(prefix string) Blockstore +} + +type Mount struct { + Prefix string + Blocks Blockstore +} + +func NewMultiBlockstore(mounts ...Mount) *multiblockstore { + return &multiblockstore{ + mounts: mounts, + } +} + +type multiblockstore struct { + mounts []Mount + gclocker +} + +func (bs *multiblockstore) FirstMount() Blockstore { + return bs.mounts[0].Blocks +} + +func (bs *multiblockstore) Mounts() []string { + mounts := make([]string, 0, len(bs.mounts)) + for _, mnt := range bs.mounts { + mounts = append(mounts, mnt.Prefix) + } + return mounts +} + +func (bs *multiblockstore) Mount(prefix string) Blockstore { + for _, m := range bs.mounts { + if m.Prefix == prefix { + return m.Blocks + } + } + return nil +} + +func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error { + return bs.mounts[0].Blocks.DeleteBlock(key) +} + +func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) { + var firstErr error + for _, m := range bs.mounts { + have, err := m.Blocks.Has(c) + if have && err == nil { + return have, nil + } + if err != nil && firstErr == nil { + firstErr = err + } + } + return false, firstErr +} + +func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) { + var firstErr error + for _, m := range bs.mounts { + blk, err := m.Blocks.Get(c) + if err == nil { + return blk, nil + } + if firstErr == nil || firstErr == ErrNotFound { + firstErr = err + } + } + return nil, firstErr +} + +func (bs *multiblockstore) Put(blk blocks.Block) error { + // First call Has() to make sure the block doesn't exist in any of + // the sub-blockstores, otherwise we could end with data being + // duplicated in two blockstores. + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + return nil // already stored + } + return bs.mounts[0].Blocks.Put(blk) +} + +func (bs *multiblockstore) PutMany(blks []blocks.Block) error { + stilladd := make([]blocks.Block, 0, len(blks)) + // Has is cheaper than Put, so if we already have it then skip + for _, blk := range blks { + exists, err := bs.Has(blk.Cid()) + if err == nil && exists { + continue // already stored + } + stilladd = append(stilladd, blk) + } + if len(stilladd) == 0 { + return nil + } + return bs.mounts[0].Blocks.PutMany(stilladd) +} + +func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { + return bs.mounts[0].Blocks.AllKeysChan(ctx) + //return nil, errors.New("Unimplemented") +} diff --git a/core/builder.go b/core/builder.go index baef82ed06d..898d72440c8 100644 --- a/core/builder.go +++ b/core/builder.go @@ -16,6 +16,7 @@ import ( pin "github.com/ipfs/go-ipfs/pin" repo "github.com/ipfs/go-ipfs/repo" cfg "github.com/ipfs/go-ipfs/repo/config" + fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo" context "context" retry "gx/ipfs/QmPF5kxTYFkzhaY5LmkExood7aTTZBHWQC6cjdDQBuGrjp/retry-datastore" @@ -167,7 +168,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { } var err error - bs := bstore.NewBlockstore(rds) + bs := bstore.NewBlockstoreWPrefix(rds, fsrepo.CacheMount) opts := bstore.DefaultCacheOpts() conf, err := n.Repo.Config() if err != nil { @@ -184,7 +185,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { return err } - n.Blockstore = bstore.NewGCBlockstore(cbs, bstore.NewGCLocker()) + mounts := []bstore.Mount{{fsrepo.CacheMount, cbs}} + + n.Blockstore = bstore.NewMultiBlockstore(mounts...) rcfg, err := n.Repo.Config() if err != nil { diff --git a/core/core.go b/core/core.go index 231b91ac6b0..16bd44689ac 100644 --- a/core/core.go +++ b/core/core.go @@ -95,11 +95,11 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - Resolver *path.Resolver // the path resolution system + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.MultiBlockstore // the block store (lower level) + Blocks bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + Resolver *path.Resolver // the path resolution system Reporter metrics.Reporter Discovery discovery.Service FilesRoot *mfs.Root diff --git a/repo/fsrepo/defaultds.go b/repo/fsrepo/defaultds.go index ed8fbafe702..c6691b77263 100644 --- a/repo/fsrepo/defaultds.go +++ b/repo/fsrepo/defaultds.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-ipfs/thirdparty/dir" "gx/ipfs/QmU4VzzKNLJXJ72SedXBQKyf5Jo8W89iWpbWQjHn9qef8N/go-ds-flatfs" levelds "gx/ipfs/QmUHmMGmcwCrjHQHcYhBnqGCSWs5pBSMbGZmfwavETR1gg/go-ds-leveldb" + //multi "github.com/ipfs/go-ipfs/repo/multi" ldbopts "gx/ipfs/QmbBhyDKsY4mbY6xsKt3qu9Y7FPvMJ6qbD8AMjYYvPRw1g/goleveldb/leveldb/opt" ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" mount "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/syncmount" @@ -20,7 +21,13 @@ const ( flatfsDirectory = "blocks" ) -func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { +const ( + RootMount = "/" + CacheMount = "/blocks" // needs to be the same as blockstore.DefaultPrefix + FilestoreMount = "/filestore" +) + +func openDefaultDatastore(r *FSRepo) (repo.Datastore, []Mount, error) { leveldbPath := path.Join(r.path, leveldbDirectory) // save leveldb reference so it can be neatly closed afterward @@ -28,7 +35,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { Compression: ldbopts.NoCompression, }) if err != nil { - return nil, fmt.Errorf("unable to open leveldb datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open leveldb datastore: %v", err) } syncfs := !r.config.Datastore.NoSync @@ -36,7 +43,7 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { // by the Qm prefix. Leaving us with 9 bits, or 512 way sharding blocksDS, err := flatfs.New(path.Join(r.path, flatfsDirectory), 5, syncfs) if err != nil { - return nil, fmt.Errorf("unable to open flatfs datastore: %v", err) + return nil, nil, fmt.Errorf("unable to open flatfs datastore: %v", err) } // Add our PeerID to metrics paths to keep them unique @@ -51,18 +58,24 @@ func openDefaultDatastore(r *FSRepo) (repo.Datastore, error) { prefix := "fsrepo." + id + ".datastore." metricsBlocks := measure.New(prefix+"blocks", blocksDS) metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS) - mountDS := mount.New([]mount.Mount{ - { - Prefix: ds.NewKey("/blocks"), - Datastore: metricsBlocks, - }, - { - Prefix: ds.NewKey("/"), - Datastore: metricsLevelDB, - }, + + var mounts []mount.Mount + var directMounts []Mount + + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(CacheMount), + Datastore: metricsBlocks, + }) + directMounts = append(directMounts, Mount{CacheMount, blocksDS}) + mounts = append(mounts, mount.Mount{ + Prefix: ds.NewKey(RootMount), + Datastore: metricsLevelDB, }) + directMounts = append(directMounts, Mount{RootMount, leveldbDS}) + + mountDS := mount.New(mounts) - return mountDS, nil + return mountDS, directMounts, nil } func initDefaultDatastore(repoPath string, conf *config.Config) error { diff --git a/repo/fsrepo/fsrepo.go b/repo/fsrepo/fsrepo.go index 03ac313e63b..847ee7777c6 100644 --- a/repo/fsrepo/fsrepo.go +++ b/repo/fsrepo/fsrepo.go @@ -20,6 +20,7 @@ import ( dir "github.com/ipfs/go-ipfs/thirdparty/dir" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" util "gx/ipfs/Qmb912gdngC1UWwTkhuW8knyRbcWeu5kqkxBpveLmW8bSr/go-ipfs-util" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" "gx/ipfs/QmeqtHtxGfcsfXiou7wqHJARWPKUTUcPdtSfSYYHp48dtQ/go-ds-measure" ) @@ -93,6 +94,12 @@ type FSRepo struct { lockfile io.Closer config *config.Config ds repo.Datastore + mounts []Mount +} + +type Mount struct { + prefix string + dstore ds.Datastore } var _ repo.Repo = (*FSRepo)(nil) @@ -331,11 +338,12 @@ func (r *FSRepo) openConfig() error { func (r *FSRepo) openDatastore() error { switch r.config.Datastore.Type { case "default", "leveldb", "": - d, err := openDefaultDatastore(r) + d, m, err := openDefaultDatastore(r) if err != nil { return err } r.ds = d + r.mounts = m default: return fmt.Errorf("unknown datastore type: %s", r.config.Datastore.Type) } @@ -557,6 +565,27 @@ func (r *FSRepo) Datastore() repo.Datastore { return d } +func (r *FSRepo) DirectMount(prefix string) ds.Datastore { + packageLock.Lock() + defer packageLock.Unlock() + for _, m := range r.mounts { + if prefix == m.prefix { + return m.dstore + } + } + return nil +} + +func (r *FSRepo) Mounts() []string { + packageLock.Lock() + mounts := make([]string, 0, len(r.mounts)) + for _, m := range r.mounts { + mounts = append(mounts, m.prefix) + } + packageLock.Unlock() + return mounts +} + // GetStorageUsage computes the storage space taken by the repo in bytes func (r *FSRepo) GetStorageUsage() (uint64, error) { pth, err := config.PathRoot() diff --git a/repo/mock.go b/repo/mock.go index 8190a0bda1b..f68e078cfbc 100644 --- a/repo/mock.go +++ b/repo/mock.go @@ -4,6 +4,7 @@ import ( "errors" "github.com/ipfs/go-ipfs/repo/config" + ds "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore" ) var errTODO = errors.New("TODO: mock repo") @@ -33,6 +34,18 @@ func (m *Mock) GetConfigKey(key string) (interface{}, error) { func (m *Mock) Datastore() Datastore { return m.D } +func (m *Mock) DirectMount(prefix string) ds.Datastore { + if prefix == "/" { + return m.D + } else { + return nil + } +} + +func (m *Mock) Mounts() []string { + return []string{"/"} +} + func (m *Mock) GetStorageUsage() (uint64, error) { return 0, nil } func (m *Mock) Close() error { return errTODO } diff --git a/repo/repo.go b/repo/repo.go index d95af0446dd..633ff57114b 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -22,6 +22,14 @@ type Repo interface { Datastore() Datastore GetStorageUsage() (uint64, error) + // DirectMount provides direct access to a datastore mounted + // under prefix in order to perform low-level operations. The + // datastore returned is guaranteed not be a proxy (such as a + // go-datastore/measure) normal operations should go through + // Datastore() + DirectMount(prefix string) ds.Datastore + Mounts() []string + // SetAPIAddr sets the API address in the repo. SetAPIAddr(addr string) error From a93b1528f0cc9e975644eb2e620042feb3b0df93 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Wed, 17 Aug 2016 16:43:36 -0400 Subject: [PATCH 2/7] Implement multiblockstore.AllKeysChan(), GC now uses the first mount. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blockstore/multi.go | 25 +++++++++++++++++++++++-- pin/gc/gc.go | 5 +++-- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/blocks/blockstore/multi.go b/blocks/blockstore/multi.go index 77d5ea61347..f866c737893 100644 --- a/blocks/blockstore/multi.go +++ b/blocks/blockstore/multi.go @@ -10,6 +10,7 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" + dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" ) type MultiBlockstore interface { @@ -102,7 +103,9 @@ func (bs *multiblockstore) Put(blk blocks.Block) error { func (bs *multiblockstore) PutMany(blks []blocks.Block) error { stilladd := make([]blocks.Block, 0, len(blks)) - // Has is cheaper than Put, so if we already have it then skip + // First call Has() to make sure the block doesn't exist in any of + // the sub-blockstores, otherwise we could end with data being + // duplicated in two blockstores. for _, blk := range blks { exists, err := bs.Has(blk.Cid()) if err == nil && exists { @@ -117,6 +120,24 @@ func (bs *multiblockstore) PutMany(blks []blocks.Block) error { } func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { - return bs.mounts[0].Blocks.AllKeysChan(ctx) + //return bs.mounts[0].Blocks.AllKeysChan(ctx) //return nil, errors.New("Unimplemented") + in := make([]<-chan *cid.Cid, 0, len(bs.mounts)) + for _, m := range bs.mounts { + ch, err := m.Blocks.AllKeysChan(ctx) + if err != nil { + return nil, err + } + in = append(in, ch) + } + out := make(chan *cid.Cid, dsq.KeysOnlyBufSize) + go func() { + defer close(out) + for _, in0 := range in { + for key := range in0 { + out <- key + } + } + }() + return out, nil } diff --git a/pin/gc/gc.go b/pin/gc/gc.go index d2607bdbef5..286f4e0f2d4 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -22,7 +22,7 @@ var log = logging.Logger("gc") // // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. -func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan *cid.Cid, error) { +func GC(ctx context.Context, bs bstore.MultiBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan *cid.Cid, error) { unlocker := bs.GCLock() ls = ls.GetOfflineLinkService() @@ -32,7 +32,8 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. return nil, err } - keychan, err := bs.AllKeysChan(ctx) + // only delete blocks in the first (cache) mount + keychan, err := bs.FirstMount().AllKeysChan(ctx) if err != nil { return nil, err } From d31cb4ba8a6f1ac4ebfd5628ee5297420f0aed8f Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sat, 20 Aug 2016 23:05:36 -0400 Subject: [PATCH 3/7] Add "block locate" command. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blockstore/multi.go | 15 ++++++++ core/commands/block.go | 79 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/blocks/blockstore/multi.go b/blocks/blockstore/multi.go index f866c737893..2d260b72ff6 100644 --- a/blocks/blockstore/multi.go +++ b/blocks/blockstore/multi.go @@ -13,12 +13,18 @@ import ( dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" ) +type LocateInfo struct { + Prefix string + Error error +} + type MultiBlockstore interface { Blockstore GCLocker FirstMount() Blockstore Mounts() []string Mount(prefix string) Blockstore + Locate(*cid.Cid) []LocateInfo } type Mount struct { @@ -90,6 +96,15 @@ func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) { return nil, firstErr } +func (bs *multiblockstore) Locate(c *cid.Cid) []LocateInfo { + res := make([]LocateInfo, 0, len(bs.mounts)) + for _, m := range bs.mounts { + _, err := m.Blocks.Get(c) + res = append(res, LocateInfo{m.Prefix, err}) + } + return res +} + func (bs *multiblockstore) Put(blk blocks.Block) error { // First call Has() to make sure the block doesn't exist in any of // the sub-blockstores, otherwise we could end with data being diff --git a/core/commands/block.go b/core/commands/block.go index 54b447d2ee1..4fa7a238e8f 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/ipfs/go-ipfs/blocks" + bs "github.com/ipfs/go-ipfs/blocks/blockstore" util "github.com/ipfs/go-ipfs/blocks/blockstore/util" cmds "github.com/ipfs/go-ipfs/commands" @@ -36,10 +37,11 @@ multihash. }, Subcommands: map[string]*cmds.Command{ - "stat": blockStatCmd, - "get": blockGetCmd, - "put": blockPutCmd, - "rm": blockRmCmd, + "stat": blockStatCmd, + "get": blockGetCmd, + "put": blockPutCmd, + "rm": blockRmCmd, + "locate": blockLocateCmd, }, } @@ -285,3 +287,72 @@ It takes a list of base58 encoded multihashs to remove. }, Type: util.RemovedBlock{}, } + +type BlockLocateRes struct { + Key string + Res []bs.LocateInfo +} + +var blockLocateCmd = &cmds.Command{ + Helptext: cmds.HelpText{ + Tagline: "Locate an IPFS block.", + ShortDescription: ` +'ipfs block rm' is a plumbing command for locating which +sub-datastores block(s) are located in. +`, + }, + Arguments: []cmds.Argument{ + cmds.StringArg("hash", true, true, "Bash58 encoded multihash of block(s) to check."), + }, + Options: []cmds.Option{ + cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false), + }, + Run: func(req cmds.Request, res cmds.Response) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + hashes := req.Arguments() + outChan := make(chan interface{}) + res.SetOutput((<-chan interface{})(outChan)) + go func() { + defer close(outChan) + for _, hash := range hashes { + key, err := cid.Decode(hash) + if err != nil { + panic(err) // FIXME + } + ret := n.Blockstore.Locate(key) + outChan <- &BlockLocateRes{hash, ret} + } + }() + return + }, + PostRun: func(req cmds.Request, res cmds.Response) { + if res.Error() != nil { + return + } + quiet, _, _ := req.Option("quiet").Bool() + outChan, ok := res.Output().(<-chan interface{}) + if !ok { + res.SetError(u.ErrCast(), cmds.ErrNormal) + return + } + res.SetOutput(nil) + + for out := range outChan { + ret := out.(*BlockLocateRes) + for _, inf := range ret.Res { + if quiet && inf.Error == nil { + fmt.Fprintf(res.Stdout(), "%s %s\n", ret.Key, inf.Prefix) + } else if !quiet && inf.Error == nil { + fmt.Fprintf(res.Stdout(), "%s %s found\n", ret.Key, inf.Prefix) + } else if !quiet { + fmt.Fprintf(res.Stdout(), "%s %s error %s\n", ret.Key, inf.Prefix, inf.Error.Error()) + } + } + } + }, + Type: BlockLocateRes{}, +} From e32702ea568ac25849ab2f6ea10ee2fcef2909f5 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 21 Aug 2016 21:33:47 -0400 Subject: [PATCH 4/7] "block/filestore rm": Allow removal of pinned but duplicate blocks. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blockstore/util/remove.go | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/blocks/blockstore/util/remove.go b/blocks/blockstore/util/remove.go index 01f2ce44e31..7c72c29d9d3 100644 --- a/blocks/blockstore/util/remove.go +++ b/blocks/blockstore/util/remove.go @@ -27,14 +27,23 @@ type RmBlocksOpts struct { Force bool } -func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error { +func RmBlocks(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error { + prefix := opts.Prefix + if prefix == "" { + prefix = mbs.Mounts()[0] + } + blocks := mbs.Mount(prefix) + if blocks == nil { + return fmt.Errorf("Could not find blockstore: %s\n", prefix) + } + go func() { defer close(out) - unlocker := blocks.GCLock() + unlocker := mbs.GCLock() defer unlocker.Unlock() - stillOkay := FilterPinned(pins, out, cids) + stillOkay := FilterPinned(mbs, pins, out, cids, prefix) for _, c := range stillOkay { err := blocks.DeleteBlock(c) @@ -50,7 +59,7 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, c return nil } -func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid { +func FilterPinned(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, prefix string) []*cid.Cid { stillOkay := make([]*cid.Cid, 0, len(cids)) res, err := pins.CheckIfPinned(cids...) if err != nil { @@ -58,7 +67,7 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c return nil } for _, r := range res { - if !r.Pinned() { + if !r.Pinned() || AvailableElsewhere(mbs, prefix, r.Key) { stillOkay = append(stillOkay, r.Key) } else { out <- &RemovedBlock{ @@ -70,6 +79,16 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c return stillOkay } +func AvailableElsewhere(mbs bs.MultiBlockstore, prefix string, c *cid.Cid) bool { + locations := mbs.Locate(c) + for _, loc := range locations { + if loc.Error == nil && loc.Prefix != prefix { + return true + } + } + return false +} + func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error { someFailed := false for res := range in { From 9c8159bfb5fd63ebd8171813cc6eef618ee08633 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 21 Aug 2016 22:08:00 -0400 Subject: [PATCH 5/7] "add": add "--allow-dup" option. This option adds a files the the primary blockstore even if the block is in another blockstore such as the filestore. License: MIT Signed-off-by: Kevin Atkinson --- core/commands/add.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/core/commands/add.go b/core/commands/add.go index 52613ca4cf1..957bbd67ea7 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -7,7 +7,8 @@ import ( "github.com/ipfs/go-ipfs/core/coreunix" "gx/ipfs/QmeWjRodbcZFKe5tMN7poEx3izym6osrLSnTLf9UjJZBbs/pb" - blockservice "github.com/ipfs/go-ipfs/blockservice" + bs "github.com/ipfs/go-ipfs/blocks/blockstore" + bserv "github.com/ipfs/go-ipfs/blockservice" cmds "github.com/ipfs/go-ipfs/commands" files "github.com/ipfs/go-ipfs/commands/files" core "github.com/ipfs/go-ipfs/core" @@ -33,6 +34,7 @@ const ( chunkerOptionName = "chunker" pinOptionName = "pin" rawLeavesOptionName = "raw-leaves" + allowDupName = "allow-dup" ) var AddCmd = &cmds.Command{ @@ -80,6 +82,7 @@ You can now refer to the added file in a gateway, like so: cmds.StringOption(chunkerOptionName, "s", "Chunking algorithm to use."), cmds.BoolOption(pinOptionName, "Pin this object when adding.").Default(true), cmds.BoolOption(rawLeavesOptionName, "Use raw blocks for leaf nodes. (experimental)"), + cmds.BoolOption(allowDupName, "Add even if blocks are in non-cache blockstore.").Default(false), }, PreRun: func(req cmds.Request) error { if quiet, _, _ := req.Option(quietOptionName).Bool(); quiet { @@ -138,6 +141,7 @@ You can now refer to the added file in a gateway, like so: chunker, _, _ := req.Option(chunkerOptionName).String() dopin, _, _ := req.Option(pinOptionName).Bool() rawblks, _, _ := req.Option(rawLeavesOptionName).Bool() + allowDup, _, _ := req.Option(allowDupName).Bool() if hash { nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{ @@ -152,18 +156,30 @@ You can now refer to the added file in a gateway, like so: n = nilnode } - dserv := n.DAG + exchange := n.Exchange local, _, _ := req.Option("local").Bool() if local { - offlineexch := offline.Exchange(n.Blockstore) - bserv := blockservice.New(n.Blockstore, offlineexch) - dserv = dag.NewDAGService(bserv) + exchange = offline.Exchange(n.Blockstore) } outChan := make(chan interface{}, 8) res.SetOutput((<-chan interface{})(outChan)) - fileAdder, err := coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dserv) + var fileAdder *coreunix.Adder + if allowDup { + // add directly to the first mount bypassing + // the Has() check of the multi-blockstore + blockstore := bs.NewGCBlockstore(n.Blockstore.FirstMount(), n.Blockstore) + blockService := bserv.NewWriteThrough(blockstore, exchange) + dagService := dag.NewDAGService(blockService) + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, blockstore, dagService) + } else if exchange != n.Exchange { + blockService := bserv.New(n.Blockstore, exchange) + dagService := dag.NewDAGService(blockService) + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, dagService) + } else { + fileAdder, err = coreunix.NewAdder(req.Context(), n.Pinning, n.Blockstore, n.DAG) + } if err != nil { res.SetError(err, cmds.ErrNormal) return From 58cc20e2957632beb5c0e166365f4f681446d1af Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Tue, 4 Oct 2016 13:44:20 -0400 Subject: [PATCH 6/7] In NewBlockstoreWPrefix don't assume "" is the default value. License: MIT Signed-off-by: Kevin Atkinson --- blocks/blockstore/blockstore.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index 4c53d9e4154..8c971d89a47 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -73,13 +73,10 @@ type gcBlockstore struct { } func NewBlockstore(d ds.Batching) *blockstore { - return NewBlockstoreWPrefix(d, "") + return NewBlockstoreWPrefix(d, DefaultPrefix) } func NewBlockstoreWPrefix(d ds.Batching, prefix string) *blockstore { - if prefix == "" { - prefix = DefaultPrefix - } var dsb ds.Batching prefixKey := ds.NewKey(prefix) dd := dsns.Wrap(d, prefixKey) From 008de00fcbbd11343ef8d0ea39490f5747b7eae5 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Sun, 16 Oct 2016 01:44:09 -0400 Subject: [PATCH 7/7] Minor refactor of "block rm" command. License: MIT Signed-off-by: Kevin Atkinson --- core/commands/block.go | 60 ++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/core/commands/block.go b/core/commands/block.go index 4fa7a238e8f..12e40aa95ff 100644 --- a/core/commands/block.go +++ b/core/commands/block.go @@ -240,34 +240,7 @@ It takes a list of base58 encoded multihashs to remove. cmds.BoolOption("quiet", "q", "Write minimal output.").Default(false), }, Run: func(req cmds.Request, res cmds.Response) { - n, err := req.InvocContext().GetNode() - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - hashes := req.Arguments() - force, _, _ := req.Option("force").Bool() - quiet, _, _ := req.Option("quiet").Bool() - cids := make([]*cid.Cid, 0, len(hashes)) - for _, hash := range hashes { - c, err := cid.Decode(hash) - if err != nil { - res.SetError(fmt.Errorf("invalid content id: %s (%s)", hash, err), cmds.ErrNormal) - return - } - - cids = append(cids, c) - } - outChan := make(chan interface{}) - err = util.RmBlocks(n.Blockstore, n.Pinning, outChan, cids, util.RmBlocksOpts{ - Quiet: quiet, - Force: force, - }) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - res.SetOutput((<-chan interface{})(outChan)) + blockRmRun(req, res, "") }, PostRun: func(req cmds.Request, res cmds.Response) { if res.Error() != nil { @@ -288,6 +261,37 @@ It takes a list of base58 encoded multihashs to remove. Type: util.RemovedBlock{}, } +func blockRmRun(req cmds.Request, res cmds.Response, prefix string) { + n, err := req.InvocContext().GetNode() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + hashes := req.Arguments() + force, _, _ := req.Option("force").Bool() + quiet, _, _ := req.Option("quiet").Bool() + cids := make([]*cid.Cid, 0, len(hashes)) + for _, hash := range hashes { + c, err := cid.Decode(hash) + if err != nil { + res.SetError(fmt.Errorf("invalid content id: %s (%s)", hash, err), cmds.ErrNormal) + return + } + cids = append(cids, c) + } + outChan := make(chan interface{}) + err = util.RmBlocks(n.Blockstore, n.Pinning, outChan, cids, util.RmBlocksOpts{ + Quiet: quiet, + Force: force, + Prefix: prefix, + }) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + res.SetOutput((<-chan interface{})(outChan)) +} + type BlockLocateRes struct { Key string Res []bs.LocateInfo