From 259aeb41eeeffa7ba3f7df50372bcbbefd201b98 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 4 Aug 2016 12:45:45 -0700 Subject: [PATCH] integrate the HAMT into mfs License: MIT Signed-off-by: Jeromy --- assets/assets.go | 13 +++- core/commands/files/files.go | 8 ++- core/coreunix/add.go | 7 +- mfs/dir.go | 108 ++++++++++++++++++------------ mfs/mfs_test.go | 125 +++++++++++++++++++++++++++++++++++ mfs/system.go | 6 +- unixfs/io/dirbuilder.go | 7 +- 7 files changed, 226 insertions(+), 48 deletions(-) diff --git a/assets/assets.go b/assets/assets.go index 4965b0f5afe..30371b7ceda 100644 --- a/assets/assets.go +++ b/assets/assets.go @@ -54,12 +54,21 @@ func addAssetList(nd *core.IpfsNode, l []string) (*key.Key, error) { fname := filepath.Base(p) k := key.B58KeyDecode(s) - if err := dirb.AddChild(nd.Context(), fname, k); err != nil { + node, err := nd.DAG.Get(nd.Context(), k) + if err != nil { + return nil, err + } + + if err := dirb.AddChild(nd.Context(), fname, node); err != nil { return nil, fmt.Errorf("assets: could not add '%s' as a child: %s", fname, err) } } - dir := dirb.GetNode() + dir, err := dirb.GetNode() + if err != nil { + return nil, err + } + dkey, err := nd.DAG.Add(dir) if err != nil { return nil, fmt.Errorf("assets: DAG.Add(dir) failed: %s", err) diff --git a/core/commands/files/files.go b/core/commands/files/files.go index 750cddaa94e..d652988818c 100644 --- a/core/commands/files/files.go +++ b/core/commands/files/files.go @@ -342,7 +342,13 @@ Examples: case *mfs.Directory: if !long { var output []mfs.NodeListing - for _, name := range fsn.ListNames() { + names, err := fsn.ListNames() + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + for _, name := range names { output = append(output, mfs.NodeListing{ Name: name, }) diff --git a/core/coreunix/add.go b/core/coreunix/add.go index 201c43f1cbd..4140a6fb91e 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -220,7 +220,12 @@ func (adder *Adder) outputDirs(path string, fsn mfs.FSNode) error { case *mfs.File: return nil case *mfs.Directory: - for _, name := range fsn.ListNames() { + names, err := fsn.ListNames() + if err != nil { + return err + } + + for _, name := range names { child, err := fsn.Child(name) if err != nil { return err diff --git a/mfs/dir.go b/mfs/dir.go index 9009d2431b8..a34e3c95f87 100644 --- a/mfs/dir.go +++ b/mfs/dir.go @@ -13,6 +13,7 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" ft "github.com/ipfs/go-ipfs/unixfs" + uio "github.com/ipfs/go-ipfs/unixfs/io" ufspb "github.com/ipfs/go-ipfs/unixfs/pb" ) @@ -28,25 +29,31 @@ type Directory struct { files map[string]*File lock sync.Mutex - node *dag.Node ctx context.Context + dirbuilder *uio.Directory + modTime time.Time name string } -func NewDirectory(ctx context.Context, name string, node *dag.Node, parent childCloser, dserv dag.DAGService) *Directory { - return &Directory{ - dserv: dserv, - ctx: ctx, - name: name, - node: node, - parent: parent, - childDirs: make(map[string]*Directory), - files: make(map[string]*File), - modTime: time.Now(), +func NewDirectory(ctx context.Context, name string, node *dag.Node, parent childCloser, dserv dag.DAGService) (*Directory, error) { + db, err := uio.NewDirectoryFromNode(dserv, node) + if err != nil { + return nil, err } + + return &Directory{ + dserv: dserv, + ctx: ctx, + name: name, + dirbuilder: db, + parent: parent, + childDirs: make(map[string]*Directory), + files: make(map[string]*File), + modTime: time.Now(), + }, nil } // closeChild updates the child by the given name to the dag node 'nd' @@ -80,21 +87,21 @@ func (d *Directory) closeChildUpdate(name string, nd *dag.Node, sync bool) (*dag } func (d *Directory) flushCurrentNode() (*dag.Node, error) { - _, err := d.dserv.Add(d.node) + nd, err := d.dirbuilder.GetNode() if err != nil { return nil, err } - return d.node.Copy(), nil + _, err = d.dserv.Add(nd) + if err != nil { + return nil, err + } + + return nd.Copy(), nil } func (d *Directory) updateChild(name string, nd *dag.Node) error { - err := d.node.RemoveNodeLink(name) - if err != nil && err != dag.ErrNotFound { - return err - } - - err = d.node.AddNodeLinkClean(name, nd) + err := d.dirbuilder.AddChild(d.ctx, name, nd) if err != nil { return err } @@ -128,7 +135,11 @@ func (d *Directory) cacheNode(name string, nd *dag.Node) (FSNode, error) { switch i.GetType() { case ufspb.Data_Directory: - ndir := NewDirectory(d.ctx, name, nd, d, d.dserv) + ndir, err := NewDirectory(d.ctx, name, nd, d, d.dserv) + if err != nil { + return nil, err + } + d.childDirs[name] = ndir return ndir, nil case ufspb.Data_File, ufspb.Data_Raw, ufspb.Data_Symlink: @@ -162,13 +173,12 @@ func (d *Directory) Uncache(name string) { // childFromDag searches through this directories dag node for a child link // with the given name func (d *Directory) childFromDag(name string) (*dag.Node, error) { - for _, lnk := range d.node.Links { - if lnk.Name == name { - return lnk.GetNode(d.ctx, d.dserv) - } + nd, err := d.dirbuilder.Find(d.ctx, name) + if err != nil { + return nil, err } - return nil, os.ErrNotExist + return nd.Copy(), nil } // childUnsync returns the child under this directory by the given name @@ -194,7 +204,7 @@ type NodeListing struct { Hash string } -func (d *Directory) ListNames() []string { +func (d *Directory) ListNames() ([]string, error) { d.lock.Lock() defer d.lock.Unlock() @@ -206,7 +216,12 @@ func (d *Directory) ListNames() []string { names[n] = struct{}{} } - for _, l := range d.node.Links { + links, err := d.dirbuilder.Links() + if err != nil { + return nil, err + } + + for _, l := range links { names[l.Name] = struct{}{} } @@ -216,7 +231,7 @@ func (d *Directory) ListNames() []string { } sort.Strings(out) - return out + return out, nil } func (d *Directory) List() ([]NodeListing, error) { @@ -224,7 +239,13 @@ func (d *Directory) List() ([]NodeListing, error) { defer d.lock.Unlock() var out []NodeListing - for _, l := range d.node.Links { + + links, err := d.dirbuilder.Links() + if err != nil { + return nil, err + } + + for _, l := range links { child := NodeListing{} child.Name = l.Name @@ -275,20 +296,23 @@ func (d *Directory) Mkdir(name string) (*Directory, error) { } } - ndir := new(dag.Node) - ndir.SetData(ft.FolderPBData()) + ndir := ft.EmptyDirNode() _, err = d.dserv.Add(ndir) if err != nil { return nil, err } - err = d.node.AddNodeLinkClean(name, ndir) + err = d.dirbuilder.AddChild(d.ctx, name, ndir) + if err != nil { + return nil, err + } + + dirobj, err := NewDirectory(d.ctx, name, ndir, d, d.dserv) if err != nil { return nil, err } - dirobj := NewDirectory(d.ctx, name, ndir, d, d.dserv) d.childDirs[name] = dirobj return dirobj, nil } @@ -300,12 +324,7 @@ func (d *Directory) Unlink(name string) error { delete(d.childDirs, name) delete(d.files, name) - err := d.node.RemoveNodeLink(name) - if err != nil { - return err - } - - _, err = d.dserv.Add(d.node) + err := d.dirbuilder.RemoveChild(d.ctx, name) if err != nil { return err } @@ -340,7 +359,7 @@ func (d *Directory) AddChild(name string, nd *dag.Node) error { return err } - err = d.node.AddNodeLinkClean(name, nd) + err = d.dirbuilder.AddChild(d.ctx, name, nd) if err != nil { return err } @@ -396,10 +415,15 @@ func (d *Directory) GetNode() (*dag.Node, error) { return nil, err } - _, err = d.dserv.Add(d.node) + nd, err := d.dirbuilder.GetNode() + if err != nil { + return nil, err + } + + _, err = d.dserv.Add(nd) if err != nil { return nil, err } - return d.node.Copy(), nil + return nd, err } diff --git a/mfs/mfs_test.go b/mfs/mfs_test.go index 383bcfd73a0..c42f66d7c28 100644 --- a/mfs/mfs_test.go +++ b/mfs/mfs_test.go @@ -747,6 +747,67 @@ func TestMfsStress(t *testing.T) { } } +func TestMfsHugeDir(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, rt := setupRoot(ctx, t) + + for i := 0; i < 100000; i++ { + err := Mkdir(rt, fmt.Sprintf("/dir%d", i), false, false) + if err != nil { + t.Fatal(err) + } + } +} + +func TestMkdirP(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + _, rt := setupRoot(ctx, t) + + err := Mkdir(rt, "/a/b/c/d/e/f", true, true) + if err != nil { + t.Fatal(err) + } +} + +func TestConcurrentWriteAndFlush(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ds, rt := setupRoot(ctx, t) + + d := mkdirP(t, rt.GetValue().(*Directory), "foo/bar/baz") + fn := fileNodeFromReader(t, ds, bytes.NewBuffer(nil)) + err := d.AddChild("file", fn) + if err != nil { + t.Fatal(err) + } + + nloops := 5000 + + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < nloops; i++ { + err := writeFile(rt, "/foo/bar/baz/file", []byte("STUFF")) + if err != nil { + t.Error("file write failed: ", err) + return + } + } + }() + + for i := 0; i < nloops; i++ { + _, err := rt.GetValue().GetNode() + if err != nil { + t.Fatal(err) + } + } + + wg.Wait() +} + func TestFlushing(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -891,6 +952,70 @@ func TestConcurrentReads(t *testing.T) { } wg.Wait() } +func writeFile(rt *Root, path string, data []byte) error { + n, err := Lookup(rt, path) + if err != nil { + return err + } + + fi, ok := n.(*File) + if !ok { + return fmt.Errorf("expected to receive a file, but didnt get one") + } + + fd, err := fi.Open(OpenWriteOnly, true) + if err != nil { + return err + } + defer fd.Close() + + nw, err := fd.Write(data) + if err != nil { + return err + } + + if nw != 10 { + fmt.Errorf("wrote incorrect amount") + } + + return nil +} + +func TestConcurrentWrites(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ds, rt := setupRoot(ctx, t) + + rootdir := rt.GetValue().(*Directory) + + path := "a/b/c" + d := mkdirP(t, rootdir, path) + + fi := fileNodeFromReader(t, ds, bytes.NewReader(make([]byte, 0))) + err := d.AddChild("afile", fi) + if err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + nloops := 100 + for i := 0; i < 10; i++ { + wg.Add(1) + go func(me int) { + defer wg.Done() + mybuf := bytes.Repeat([]byte{byte(me)}, 10) + for j := 0; j < nloops; j++ { + err := writeFile(rt, "a/b/c/afile", mybuf) + if err != nil { + t.Error("writefile failed: ", err) + return + } + } + }(i) + } + wg.Wait() +} func TestFileDescriptors(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) diff --git a/mfs/system.go b/mfs/system.go index 40d9d29cdb7..cffb0da55a3 100644 --- a/mfs/system.go +++ b/mfs/system.go @@ -91,7 +91,11 @@ func NewRoot(parent context.Context, ds dag.DAGService, node *dag.Node, pf PubFu switch pbn.GetType() { case ft.TDirectory: - root.val = NewDirectory(parent, ndk.String(), node, root, ds) + rdir, err := NewDirectory(parent, ndk.String(), node, root, ds) + if err != nil { + return nil, err + } + root.val = rdir case ft.TFile, ft.TMetadata, ft.TRaw: fi, err := NewFile(ndk.String(), node, root, ds) if err != nil { diff --git a/unixfs/io/dirbuilder.go b/unixfs/io/dirbuilder.go index 53c58ff2ed0..1b8ffb7426c 100644 --- a/unixfs/io/dirbuilder.go +++ b/unixfs/io/dirbuilder.go @@ -2,6 +2,7 @@ package io import ( "fmt" + "os" "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" @@ -109,8 +110,12 @@ func (d *Directory) Links() ([]*mdag.Link, error) { func (d *Directory) Find(ctx context.Context, name string) (*mdag.Node, error) { if d.shard == nil { lnk, err := d.dirnode.GetNodeLink(name) - if err != nil { + switch err { + case mdag.ErrLinkNotFound: + return nil, os.ErrNotExist + default: return nil, err + case nil: } return d.dserv.Get(ctx, key.Key(lnk.Hash))