Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --stream option to ls command #5611

Merged
merged 7 commits into from
Nov 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 189 additions & 106 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package commands

import (
"bytes"
"fmt"
"io"
"os"
"text/tabwriter"

cmds "github.com/ipfs/go-ipfs/commands"
e "github.com/ipfs/go-ipfs/core/commands/e"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
iface "github.com/ipfs/go-ipfs/core/coreapi/interface"

cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
Expand All @@ -16,29 +15,36 @@ import (
unixfspb "gx/ipfs/QmUnHNqhSB1JgzVCxL1Kz3yb4bdyB4q1Z9AD5AUBVmt3fZ/go-unixfs/pb"
blockservice "gx/ipfs/QmVDTbzzTwnuBwNbJdhW3u7LoBQp46bezm9yp4z1RoEepM/go-blockservice"
offline "gx/ipfs/QmYZwey1thDTynSrvd6qQkX24UpTka6TFhQ2v569UpoqxD/go-ipfs-exchange-offline"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
merkledag "gx/ipfs/QmcGt25mrjuB2kKW2zhPbXVZNHc4yoTDQ65NA8m6auP2f1/go-merkledag"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
"gx/ipfs/Qmde5VP1qUkyQXKCfmEUA7bP64V2HAptbJ7phuPp7jXWwg/go-ipfs-cmdkit"
)

// LsLink contains printable data for a single ipld link in ls output
type LsLink struct {
Name, Hash string
Size uint64
Type unixfspb.Data_DataType
}

// LsObject is an element of LsOutput
// It can represent all or part of a directory
type LsObject struct {
Hash string
Links []LsLink
}

// LsOutput is a set of printable data for directories,
// it can be complete or partial
type LsOutput struct {
Objects []LsObject
}

const (
lsHeadersOptionNameTime = "headers"
lsResolveTypeOptionName = "resolve-type"
lsStreamOptionName = "stream"
)

var LsCmd = &cmds.Command{
Expand All @@ -60,158 +66,235 @@ The JSON output contains type information.
Options: []cmdkit.Option{
cmdkit.BoolOption(lsHeadersOptionNameTime, "v", "Print table headers (Hash, Size, Name)."),
cmdkit.BoolOption(lsResolveTypeOptionName, "Resolve linked objects to find out their types.").WithDefault(true),
cmdkit.BoolOption(lsStreamOptionName, "s", "Enable exprimental streaming of directory entries as they are traversed."),
},
Run: func(req cmds.Request, res cmds.Response) {
nd, err := req.InvocContext().GetNode()
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
nd, err := cmdenv.GetNode(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

api, err := req.InvocContext().GetApi()
api, err := cmdenv.GetApi(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

// get options early -> exit early in case of error
if _, _, err := req.Option(lsHeadersOptionNameTime).Bool(); err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

resolve, _, err := req.Option(lsResolveTypeOptionName).Bool()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

resolve, _ := req.Options[lsResolveTypeOptionName].(bool)
dserv := nd.DAG
if !resolve {
offlineexch := offline.Exchange(nd.Blockstore)
bserv := blockservice.New(nd.Blockstore, offlineexch)
dserv = merkledag.NewDAGService(bserv)
}

paths := req.Arguments()
err = req.ParseBodyArgs()
if err != nil {
return err
}

hannahhoward marked this conversation as resolved.
Show resolved Hide resolved
paths := req.Arguments

var dagnodes []ipld.Node
for _, fpath := range paths {
p, err := iface.ParsePath(fpath)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

dagnode, err := api.ResolveNode(req.Context(), p)
dagnode, err := api.ResolveNode(req.Context, p)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
dagnodes = append(dagnodes, dagnode)
}

output := make([]LsObject, len(req.Arguments()))
ng := merkledag.NewSession(req.Context(), nd.DAG)
ng := merkledag.NewSession(req.Context, nd.DAG)
ro := merkledag.NewReadOnlyDagService(ng)

stream, _ := req.Options[lsStreamOptionName].(bool)

if !stream {
output := make([]LsObject, len(req.Arguments))

for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}

var links []*ipld.Link
if dir == nil {
links = dagnode.Links()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gah... #4186. I'm going to just break this one day...

} else {
links, err = dir.Links(req.Context)
if err != nil {
return err
}
}
outputLinks := make([]LsLink, len(links))
for j, link := range links {
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
outputLinks[j] = *lsLink
}
output[i] = LsObject{
Hash: paths[i],
Links: outputLinks,
}
}

return cmds.EmitOnce(res, &LsOutput{output})
}

for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(ro, dagnode)
if err != nil && err != uio.ErrNotADir {
res.SetError(fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err), cmdkit.ErrNormal)
return
return fmt.Errorf("the data in %s (at %q) is not a UnixFS directory: %s", dagnode.Cid(), paths[i], err)
}

var links []*ipld.Link
var linkResults <-chan unixfs.LinkResult
if dir == nil {
links = dagnode.Links()
linkResults = makeDagNodeLinkResults(req, dagnode)
} else {
links, err = dir.Links(req.Context())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
}

output[i] = LsObject{
Hash: paths[i],
Links: make([]LsLink, len(links)),
linkResults = dir.EnumLinksAsync(req.Context)
}

for j, link := range links {
t := unixfspb.Data_DataType(-1)

switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context(), dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
for linkResult := range linkResults {

if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}
t = d.Type()
}
if linkResult.Err != nil {
return linkResult.Err
}
link := linkResult.Link
lsLink, err := makeLsLink(req, dserv, resolve, link)
if err != nil {
return err
}
output[i].Links[j] = LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
output := []LsObject{{
Hash: paths[i],
Links: []LsLink{*lsLink},
}}
if err = res.Emit(&LsOutput{output}); err != nil {
return err
}
}
}
return nil
},
PostRun: cmds.PostRunMap{
kevina marked this conversation as resolved.
Show resolved Hide resolved
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
req := res.Request()
lastObjectHash := ""

res.SetOutput(&LsOutput{output})
for {
v, err := res.Next()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
out := v.(*LsOutput)
lastObjectHash = tabularOutput(req, os.Stdout, out, lastObjectHash, false)
}
},
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *LsOutput) error {
// when streaming over HTTP using a text encoder, we cannot render breaks
// between directories because we don't know the hash of the last
// directory encoder
ignoreBreaks, _ := req.Options[lsStreamOptionName].(bool)
tabularOutput(req, w, out, "", ignoreBreaks)
return nil
}),
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
Type: LsOutput{},
}

v, err := unwrapOutput(res.Output())
func makeDagNodeLinkResults(req *cmds.Request, dagnode ipld.Node) <-chan unixfs.LinkResult {
links := dagnode.Links()
linkResults := make(chan unixfs.LinkResult, len(links))
defer close(linkResults)
for _, l := range links {
linkResults <- unixfs.LinkResult{
Link: l,
Err: nil,
}
}
magik6k marked this conversation as resolved.
Show resolved Hide resolved
return linkResults
}

func makeLsLink(req *cmds.Request, dserv ipld.DAGService, resolve bool, link *ipld.Link) (*LsLink, error) {
t := unixfspb.Data_DataType(-1)

switch link.Cid.Type() {
case cid.Raw:
// No need to check with raw leaves
t = unixfs.TFile
case cid.DagProtobuf:
linkNode, err := link.GetNode(req.Context, dserv)
if err == ipld.ErrNotFound && !resolve {
// not an error
linkNode = nil
} else if err != nil {
return nil, err
}

if pn, ok := linkNode.(*merkledag.ProtoNode); ok {
d, err := unixfs.FSNodeFromBytes(pn.Data())
if err != nil {
return nil, err
}
t = d.Type()
}
}
return &LsLink{
Name: link.Name,
Hash: link.Cid.String(),
Size: link.Size,
Type: t,
}, nil
}

headers, _, _ := res.Request().Option(lsHeadersOptionNameTime).Bool()
output, ok := v.(*LsOutput)
if !ok {
return nil, e.TypeErr(output, v)
}
func tabularOutput(req *cmds.Request, w io.Writer, out *LsOutput, lastObjectHash string, ignoreBreaks bool) string {
kevina marked this conversation as resolved.
Show resolved Hide resolved
headers, _ := req.Options[lsHeadersOptionNameTime].(bool)
stream, _ := req.Options[lsStreamOptionName].(bool)
// in streaming mode we can't automatically align the tabs
// so we take a best guess
var minTabWidth int
if stream {
minTabWidth = 10
} else {
minTabWidth = 1
}

buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 1, 2, 1, ' ', 0)
for _, object := range output.Objects {
if len(output.Objects) > 1 {
fmt.Fprintf(w, "%s:\n", object.Hash)
}
if headers {
fmt.Fprintln(w, "Hash\tSize\tName")
}
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
}
fmt.Fprintf(w, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
if len(output.Objects) > 1 {
fmt.Fprintln(w)
multipleFolders := len(req.Arguments) > 1

tw := tabwriter.NewWriter(w, minTabWidth, 2, 1, ' ', 0)

for _, object := range out.Objects {

if !ignoreBreaks && object.Hash != lastObjectHash {
if multipleFolders {
if lastObjectHash != "" {
fmt.Fprintln(tw)
}
fmt.Fprintf(tw, "%s:\n", object.Hash)
}
w.Flush()
if headers {
fmt.Fprintln(tw, "Hash\tSize\tName")
}
lastObjectHash = object.Hash
}

return buf, nil
},
},
Type: LsOutput{},
for _, link := range object.Links {
if link.Type == unixfs.TDirectory {
link.Name += "/"
}

fmt.Fprintf(tw, "%s\t%v\t%s\n", link.Hash, link.Size, link.Name)
}
}
tw.Flush()
return lastObjectHash
}
Loading