-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: Add support for multiple blockstores #3257
Changes from all commits
3b752eb
a93b152
d31cb4b
e32702e
9c8159b
58cc20e
008de00
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,11 +104,11 @@ func TestHasIsBloomCached(t *testing.T) { | |
block := blocks.NewBlock([]byte("newBlock")) | ||
|
||
cachedbs.PutMany([]blocks.Block{block}) | ||
if cacheFails != 2 { | ||
t.Fatalf("expected two datastore hits: %d", cacheFails) | ||
if cacheFails != 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I verified this was correct, the change was due to an implementation detail. Unfortually, I can't remember why. If it is important enough I will spend an hour or two looking into it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It always weirds me out when i see tests changing. Maybe @Kubuxu is more familiar with this and can check it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was to because This means that now we don't check the datastore with https://github.com/ipfs/go-ipfs/pull/3257/files#r80438802 |
||
t.Fatalf("expected datastore hits: %d", cacheFails) | ||
} | ||
cachedbs.Put(block) | ||
if cacheFails != 3 { | ||
if cacheFails != 2 { | ||
t.Fatalf("expected datastore hit: %d", cacheFails) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
package blockstore | ||
|
||
// A very simple multi-blockstore that analogous to a unionfs Put and | ||
// DeleteBlock only go to the first blockstore all others are | ||
// considered readonly. | ||
|
||
import ( | ||
//"errors" | ||
"context" | ||
|
||
blocks "github.com/ipfs/go-ipfs/blocks" | ||
cid "gx/ipfs/QmXfiyr2RWEXpVDdaYnD2HNiBk6UBddsvEP4RPfXb6nGqY/go-cid" | ||
dsq "gx/ipfs/QmbzuUusHqaLLoNTDEVLcSF6vZDHZDLPC7p4bztRvvkXxU/go-datastore/query" | ||
) | ||
|
||
type LocateInfo struct { | ||
Prefix string | ||
Error error | ||
} | ||
|
||
type MultiBlockstore interface { | ||
Blockstore | ||
GCLocker | ||
FirstMount() Blockstore | ||
Mounts() []string | ||
Mount(prefix string) Blockstore | ||
Locate(*cid.Cid) []LocateInfo | ||
} | ||
|
||
type Mount struct { | ||
Prefix string | ||
Blocks Blockstore | ||
} | ||
|
||
func NewMultiBlockstore(mounts ...Mount) *multiblockstore { | ||
return &multiblockstore{ | ||
mounts: mounts, | ||
} | ||
} | ||
|
||
type multiblockstore struct { | ||
mounts []Mount | ||
gclocker | ||
} | ||
|
||
func (bs *multiblockstore) FirstMount() Blockstore { | ||
return bs.mounts[0].Blocks | ||
} | ||
|
||
func (bs *multiblockstore) Mounts() []string { | ||
mounts := make([]string, 0, len(bs.mounts)) | ||
for _, mnt := range bs.mounts { | ||
mounts = append(mounts, mnt.Prefix) | ||
} | ||
return mounts | ||
} | ||
|
||
func (bs *multiblockstore) Mount(prefix string) Blockstore { | ||
for _, m := range bs.mounts { | ||
if m.Prefix == prefix { | ||
return m.Blocks | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (bs *multiblockstore) DeleteBlock(key *cid.Cid) error { | ||
return bs.mounts[0].Blocks.DeleteBlock(key) | ||
} | ||
|
||
func (bs *multiblockstore) Has(c *cid.Cid) (bool, error) { | ||
var firstErr error | ||
for _, m := range bs.mounts { | ||
have, err := m.Blocks.Has(c) | ||
if have && err == nil { | ||
return have, nil | ||
} | ||
if err != nil && firstErr == nil { | ||
firstErr = err | ||
} | ||
} | ||
return false, firstErr | ||
} | ||
|
||
func (bs *multiblockstore) Get(c *cid.Cid) (blocks.Block, error) { | ||
var firstErr error | ||
for _, m := range bs.mounts { | ||
blk, err := m.Blocks.Get(c) | ||
if err == nil { | ||
return blk, nil | ||
} | ||
if firstErr == nil || firstErr == ErrNotFound { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would replace it with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I do that than firstErr could end up being nil even if the block was not found. The idea is to return ErrNotFound only if that is the case for all the blockstore's. If not than return the more serious error. |
||
firstErr = err | ||
} | ||
} | ||
return nil, firstErr | ||
} | ||
|
||
func (bs *multiblockstore) Locate(c *cid.Cid) []LocateInfo { | ||
res := make([]LocateInfo, 0, len(bs.mounts)) | ||
for _, m := range bs.mounts { | ||
_, err := m.Blocks.Get(c) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am fine with that. Not 100% happy with it as filestore Has doesn't really guarantee the block is available (and changing it so it does will make the Has() call really expensive), but its not a major problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I partly take the Has() comment back. The filestore Has() needs fixing or the Put below needs fixing. |
||
res = append(res, LocateInfo{m.Prefix, err}) | ||
} | ||
return res | ||
} | ||
|
||
func (bs *multiblockstore) Put(blk blocks.Block) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if i add a file with the filestore, then i add another file normally that overlaps a block with it. one block from my 'normal' add will be referencing a file on disk? That feels odd to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, but rather difficult to avoid. |
||
// First call Has() to make sure the block doesn't exist in any of | ||
// the sub-blockstores, otherwise we could end with data being | ||
// duplicated in two blockstores. | ||
exists, err := bs.Has(blk.Cid()) | ||
if err == nil && exists { | ||
return nil // already stored | ||
} | ||
return bs.mounts[0].Blocks.Put(blk) | ||
} | ||
|
||
func (bs *multiblockstore) PutMany(blks []blocks.Block) error { | ||
stilladd := make([]blocks.Block, 0, len(blks)) | ||
// First call Has() to make sure the block doesn't exist in any of | ||
// the sub-blockstores, otherwise we could end with data being | ||
// duplicated in two blockstores. | ||
for _, blk := range blks { | ||
exists, err := bs.Has(blk.Cid()) | ||
if err == nil && exists { | ||
continue // already stored | ||
} | ||
stilladd = append(stilladd, blk) | ||
} | ||
if len(stilladd) == 0 { | ||
return nil | ||
} | ||
return bs.mounts[0].Blocks.PutMany(stilladd) | ||
} | ||
|
||
func (bs *multiblockstore) AllKeysChan(ctx context.Context) (<-chan *cid.Cid, error) { | ||
//return bs.mounts[0].Blocks.AllKeysChan(ctx) | ||
//return nil, errors.New("Unimplemented") | ||
in := make([]<-chan *cid.Cid, 0, len(bs.mounts)) | ||
for _, m := range bs.mounts { | ||
ch, err := m.Blocks.AllKeysChan(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
in = append(in, ch) | ||
} | ||
out := make(chan *cid.Cid, dsq.KeysOnlyBufSize) | ||
go func() { | ||
defer close(out) | ||
for _, in0 := range in { | ||
for key := range in0 { | ||
out <- key | ||
} | ||
} | ||
}() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If @whyrusleeping agrees with me I would open a goroutine per BS and make them pipe from one AllKeysChan to one external. This way if first BS is slow and second is fast, the first won't slow down whole process. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return out, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,14 +27,23 @@ type RmBlocksOpts struct { | |
Force bool | ||
} | ||
|
||
func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error { | ||
func RmBlocks(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, opts RmBlocksOpts) error { | ||
prefix := opts.Prefix | ||
if prefix == "" { | ||
prefix = mbs.Mounts()[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this only remove from one blockstore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
} | ||
blocks := mbs.Mount(prefix) | ||
if blocks == nil { | ||
return fmt.Errorf("Could not find blockstore: %s\n", prefix) | ||
} | ||
|
||
go func() { | ||
defer close(out) | ||
|
||
unlocker := blocks.GCLock() | ||
unlocker := mbs.GCLock() | ||
defer unlocker.Unlock() | ||
|
||
stillOkay := FilterPinned(pins, out, cids) | ||
stillOkay := FilterPinned(mbs, pins, out, cids, prefix) | ||
|
||
for _, c := range stillOkay { | ||
err := blocks.DeleteBlock(c) | ||
|
@@ -50,15 +59,15 @@ func RmBlocks(blocks bs.GCBlockstore, pins pin.Pinner, out chan<- interface{}, c | |
return nil | ||
} | ||
|
||
func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*cid.Cid { | ||
func FilterPinned(mbs bs.MultiBlockstore, pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid, prefix string) []*cid.Cid { | ||
stillOkay := make([]*cid.Cid, 0, len(cids)) | ||
res, err := pins.CheckIfPinned(cids...) | ||
if err != nil { | ||
out <- &RemovedBlock{Error: fmt.Sprintf("pin check failed: %s", err)} | ||
return nil | ||
} | ||
for _, r := range res { | ||
if !r.Pinned() { | ||
if !r.Pinned() || AvailableElsewhere(mbs, prefix, r.Key) { | ||
stillOkay = append(stillOkay, r.Key) | ||
} else { | ||
out <- &RemovedBlock{ | ||
|
@@ -70,6 +79,16 @@ func FilterPinned(pins pin.Pinner, out chan<- interface{}, cids []*cid.Cid) []*c | |
return stillOkay | ||
} | ||
|
||
func AvailableElsewhere(mbs bs.MultiBlockstore, prefix string, c *cid.Cid) bool { | ||
locations := mbs.Locate(c) | ||
for _, loc := range locations { | ||
if loc.Error == nil && loc.Prefix != prefix { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
func ProcRmOutput(in <-chan interface{}, sout io.Writer, serr io.Writer) error { | ||
someFailed := false | ||
for res := range in { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree its probably okay to remove this and do it intentionally in a certain place (instead of at every layer). But this gets a bit weird... For example, if someone just wants to use the blockstore on its own, they don't get this nice optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure about that? MultiBlockstore just calls Has method of Blockstore but it doesn't mean that Blockstore (on Datastore) will check the Has of datastore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UPDATE: ahh, you call an explicit
Has
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So can I take it that this is resolved?