Skip to content

Commit

Permalink
Merge pull request #5162 from ipfs/feat/improve-preload
Browse files Browse the repository at this point in the history
always try to read ahead by at least 5 blocks in the PBDagReader
  • Loading branch information
whyrusleeping authored Jul 17, 2018
2 parents 2dcb7f0 + 7fd3404 commit 3218703
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
35 changes: 35 additions & 0 deletions unixfs/io/dagreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,41 @@ func TestSeekAndReadLarge(t *testing.T) {
}
}

func TestReadAndCancel(t *testing.T) {
dserv := testu.GetDAGServ()
inbuf := make([]byte, 20000)
rand.Read(inbuf)

node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background())
defer closer()

reader, err := NewDagReader(ctx, node, dserv)
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())
buf := make([]byte, 100)
_, err = reader.CtxReadFull(ctx, buf)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, inbuf[0:100]) {
t.Fatal("read failed")
}
cancel()

b, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(inbuf[100:], b) {
t.Fatal("buffers not equal")
}
}

func TestRelativeSeek(t *testing.T) {
dserv := testu.GetDAGServ()
ctx, closer := context.WithCancel(context.Background())
Expand Down
42 changes: 33 additions & 9 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,13 @@ func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, file *ft.FSNode, se

const preloadSize = 10

func (dr *PBDagReader) preloadNextNodes(ctx context.Context) {
beg := dr.linkPosition
func (dr *PBDagReader) preload(ctx context.Context, beg int) {
end := beg + preloadSize
if end >= len(dr.links) {
end = len(dr.links)
}

for i, p := range ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
dr.promises[beg+i] = p
}
copy(dr.promises[beg:], ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]))
}

// precalcNextBuf follows the next link in line and loads it from the
Expand All @@ -87,15 +84,42 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error {
return io.EOF
}

if dr.promises[dr.linkPosition] == nil {
dr.preloadNextNodes(ctx)
// If we drop to <= preloadSize/2 preloading nodes, preload the next 10.
for i := dr.linkPosition; i < dr.linkPosition+preloadSize/2 && i < len(dr.promises); i++ {
// TODO: check if canceled.
if dr.promises[i] == nil {
dr.preload(ctx, i)
break
}
}

nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
dr.promises[dr.linkPosition] = nil
switch err {
case nil:
case context.DeadlineExceeded, context.Canceled:
err = ctx.Err()
if err != nil {
return ctx.Err()
}
// In this case, the context used to *preload* the node has been canceled.
// We need to retry the load with our context and we might as
// well preload some extra nodes while we're at it.
//
// Note: When using `Read`, this code will never execute as
// `Read` will use the global context. It only runs if the user
// explicitly reads with a custom context (e.g., by calling
// `CtxReadFull`).
dr.preload(ctx, dr.linkPosition)
nxt, err = dr.promises[dr.linkPosition].Get(ctx)
dr.promises[dr.linkPosition] = nil
if err != nil {
return err
}
default:
return err
}
dr.promises[dr.linkPosition] = nil

dr.linkPosition++

switch nxt := nxt.(type) {
Expand Down

0 comments on commit 3218703

Please sign in to comment.