Skip to content

Commit

Permalink
Merge pull request #3403 from ipfs/feat/dg-reader-cleanup
Browse files Browse the repository at this point in the history
Fix panic in dagreader with raw nodes
  • Loading branch information
whyrusleeping authored Nov 22, 2016
2 parents 656c112 + 1fd99da commit 0fcb92f
Show file tree
Hide file tree
Showing 8 changed files with 315 additions and 255 deletions.
2 changes: 1 addition & 1 deletion core/coreunix/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
uio "github.com/ipfs/go-ipfs/unixfs/io"
)

func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (*uio.DagReader, error) {
func Cat(ctx context.Context, n *core.IpfsNode, pstr string) (uio.DagReader, error) {
dagNode, err := core.Resolve(ctx, n.Namesys, n.Resolver, path.Path(pstr))
if err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions test/sharness/t0110-gateway.sh
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ for cmd in "add" "block/put" "bootstrap" "config" "dht" "diag" "dns" "get" "id"
done
'

test_expect_success "create raw-leaves node" '
echo "This is RAW!" > rfile &&
echo "This is RAW!" | ipfs add --raw-leaves -q > rhash
'

test_expect_success "try fetching it from gateway" '
curl http://127.0.0.1:$port/ipfs/$(cat rhash) > ffile &&
test_cmp rfile ffile
'

test_kill_ipfs_daemon

test_done
2 changes: 1 addition & 1 deletion unixfs/archive/tar/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (w *Writer) writeFile(nd *mdag.ProtoNode, pb *upb.Data, fpath string) error
return err
}

dagr := uio.NewDataFileReader(w.ctx, nd, pb, w.Dag)
dagr := uio.NewPBFileReader(w.ctx, nd, pb, w.Dag)
if _, err := dagr.WriteTo(w.TarW); err != nil {
return err
}
Expand Down
41 changes: 41 additions & 0 deletions unixfs/io/bufdagreader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io

import (
"bytes"
"context"
"io"
)

type bufDagReader struct {
*bytes.Reader
}

func NewBufDagReader(b []byte) *bufDagReader {
return &bufDagReader{bytes.NewReader(b)}
}

var _ DagReader = (*bufDagReader)(nil)

func (*bufDagReader) Close() error {
return nil
}

func (rd *bufDagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
return rd.Read(b)
}

func (rd *bufDagReader) Offset() int64 {
of, err := rd.Seek(0, io.SeekCurrent)
if err != nil {
panic("this should never happen " + err.Error())
}
return of
}

func (rd *bufDagReader) Size() uint64 {
s := rd.Reader.Size()
if s < 0 {
panic("size smaller than 0 (impossible!!)")
}
return uint64(s)
}
259 changes: 8 additions & 251 deletions unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package io

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"

mdag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
Expand All @@ -20,34 +18,11 @@ var ErrIsDir = errors.New("this dag node is a directory")

var ErrCantReadSymlinks = errors.New("cannot currently read symlinks")

// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv mdag.DAGService

// the node being read
node *mdag.ProtoNode

// cached protobuf structure from node.Data
pbdata *ftpb.Data

// the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader
buf ReadSeekCloser

// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter

// the index of the child link currently being read from
linkPosition int

// current offset for the read head within the 'file'
offset int64

// Our context
ctx context.Context

// context cancel for children
cancel func()
type DagReader interface {
ReadSeekCloser
Size() uint64
CtxReadFull(context.Context, []byte) (int, error)
Offset() int64
}

type ReadSeekCloser interface {
Expand All @@ -59,12 +34,10 @@ type ReadSeekCloser interface {

// NewDagReader creates a new reader object that reads the data represented by
// the given node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagReader, error) {
func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (DagReader, error) {
switch n := n.(type) {
case *mdag.RawNode:
return &DagReader{
buf: NewRSNCFromBytes(n.RawData()),
}, nil
return NewBufDagReader(n.RawData()), nil
case *mdag.ProtoNode:
pb := new(ftpb.Data)
if err := proto.Unmarshal(n.Data(), pb); err != nil {
Expand All @@ -76,7 +49,7 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagR
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_File, ftpb.Data_Raw:
return NewDataFileReader(ctx, n, pb, serv), nil
return NewPBFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata:
if len(n.Links()) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
Expand All @@ -100,219 +73,3 @@ func NewDagReader(ctx context.Context, n node.Node, serv mdag.DAGService) (*DagR
return nil, fmt.Errorf("unrecognized node type")
}
}

func NewDataFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := mdag.GetDAG(fctx, serv, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}

// precalcNextBuf follows the next link in line and loads it from the
// DAGService, setting the next buffer to read from
func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}

nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.linkPosition++

switch nxt := nxt.(type) {
case *mdag.ProtoNode:
pb := new(ftpb.Data)
err = proto.Unmarshal(nxt.Data(), pb)
if err != nil {
return fmt.Errorf("incorrectly formatted protobuf: %s", err)
}

switch pb.GetType() {
case ftpb.Data_Directory:
// A directory should not exist within a file
return ft.ErrInvalidDirLocation
case ftpb.Data_File:
dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv)
return nil
case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData())
return nil
case ftpb.Data_Metadata:
return errors.New("shouldnt have had metadata object inside file")
case ftpb.Data_Symlink:
return errors.New("shouldnt have had symlink inside file")
default:
return ft.ErrUnrecognizedType
}
case *mdag.RawNode:
dr.buf = NewRSNCFromBytes(nxt.RawData())
return nil
default:
return errors.New("unrecognized node type in DagReader")
}
}

// Size return the total length of the data from the DAG structured file.
func (dr *DagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
}

// Read reads data from the DAG structured file
func (dr *DagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}

// CtxReadFull reads data from the DAG structured file
func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
// If no cached buffer, load one
total := 0
for {
// Attempt to fill bytes from cached buffer
n, err := dr.buf.Read(b[total:])
total += n
dr.offset += int64(n)
if err != nil {
// EOF is expected
if err != io.EOF {
return total, err
}
}

// If weve read enough bytes, return
if total == len(b) {
return total, nil
}

// Otherwise, load up the next block
err = dr.precalcNextBuf(ctx)
if err != nil {
return total, err
}
}
}

func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
// If no cached buffer, load one
total := int64(0)
for {
// Attempt to write bytes from cached buffer
n, err := dr.buf.WriteTo(w)
total += n
dr.offset += n
if err != nil {
if err != io.EOF {
return total, err
}
}

// Otherwise, load up the next block
err = dr.precalcNextBuf(dr.ctx)
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}

func (dr *DagReader) Close() error {
dr.cancel()
return nil
}

func (dr *DagReader) Offset() int64 {
return dr.offset
}

// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
if offset < 0 {
return -1, errors.New("Invalid offset")
}

// Grab cached protobuf object (solely to make code look cleaner)
pb := dr.pbdata

// left represents the number of bytes remaining to seek to (from beginning)
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])

// start reading links from the beginning
dr.linkPosition = 0
dr.offset = offset
return offset, nil
} else {
// skip past root block data
left -= int64(len(pb.Data))
}

// iterate through links and find where we need to be
for i := 0; i < len(pb.Blocksizes); i++ {
if pb.Blocksizes[i] > uint64(left) {
dr.linkPosition = i
break
} else {
left -= int64(pb.Blocksizes[i])
}
}

// start sub-block request
err := dr.precalcNextBuf(dr.ctx)
if err != nil {
return 0, err
}

// set proper offset within child readseeker
n, err := dr.buf.Seek(left, os.SEEK_SET)
if err != nil {
return -1, err
}

// sanity
left -= n
if left != 0 {
return -1, errors.New("failed to seek properly")
}
dr.offset = offset
return offset, nil
case os.SEEK_CUR:
// TODO: be smarter here
noffset := dr.offset + offset
return dr.Seek(noffset, os.SEEK_SET)
case os.SEEK_END:
noffset := int64(dr.pbdata.GetFilesize()) - offset
return dr.Seek(noffset, os.SEEK_SET)
default:
return 0, errors.New("invalid whence")
}
}

// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
type readSeekNopCloser struct {
*bytes.Reader
}

func NewRSNCFromBytes(b []byte) ReadSeekCloser {
return &readSeekNopCloser{bytes.NewReader(b)}
}

func (r *readSeekNopCloser) Close() error { return nil }
2 changes: 1 addition & 1 deletion unixfs/io/dagreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func TestReaderSzie(t *testing.T) {
}
}

func readByte(t testing.TB, reader *DagReader) byte {
func readByte(t testing.TB, reader DagReader) byte {
out := make([]byte, 1)
c, err := reader.Read(out)

Expand Down
Loading

0 comments on commit 0fcb92f

Please sign in to comment.