forked from couchbaselabs/cbfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ioutil.go
113 lines (95 loc) · 1.99 KB
/
ioutil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"errors"
"io"
"math/rand"
"time"
)
var Timeout = errors.New("Timeout")
type randomDataMaker struct {
src rand.Source
}
func (r *randomDataMaker) Read(p []byte) (n int, err error) {
todo := len(p)
offset := 0
for {
val := int64(r.src.Int63())
for i := 0; i < 8; i++ {
p[offset] = byte(val)
todo--
if todo == 0 {
return len(p), nil
}
offset++
val >>= 8
}
}
panic("unreachable")
}
type copyRes struct {
s int64
e error
}
type ErrorCloser interface {
io.ReadCloser
CloseWithError(error) error
}
func bgCopy(w io.Writer, r io.Reader, ch chan<- copyRes) {
s, e := io.Copy(w, r)
ch <- copyRes{s, e}
}
type closingPipe struct {
r io.Reader
pr *io.PipeReader
pw *io.PipeWriter
err error
timeout time.Duration
}
func (cp *closingPipe) Read(p []byte) (n int, err error) {
n, err = cp.r.Read(p)
if n > 0 {
// Pipe writes block completely if the consumer stops
// reading. This lets us tear them down meaningfully.
timer := time.AfterFunc(cp.timeout, func() {
cp.CloseWithError(Timeout)
})
defer timer.Stop()
if n, err := cp.pw.Write(p[:n]); err != nil {
return n, err
}
}
if err != nil {
cp.err = err
cp.pr.CloseWithError(err)
cp.pw.CloseWithError(err)
}
return
}
func (cp *closingPipe) CloseWithError(err error) error {
cp.err = err
cp.pr.CloseWithError(cp.err)
return cp.pw.CloseWithError(cp.err)
}
func (cp *closingPipe) Close() error {
cp.err = io.EOF
cp.pr.CloseWithError(cp.err)
return cp.pw.CloseWithError(cp.err)
}
type pipeErrAdaptor struct {
p *io.PipeReader
}
func (p *pipeErrAdaptor) Read(b []byte) (int, error) {
n, err := p.p.Read(b)
if err == io.ErrClosedPipe {
err = io.EOF
}
return n, err
}
func newMultiReaderTimeout(r io.Reader, to time.Duration) (ErrorCloser, io.Reader) {
pr, pw := io.Pipe()
return &closingPipe{r, pr, pw, nil, to},
&pipeErrAdaptor{pr}
}
func newMultiReader(r io.Reader) (ErrorCloser, io.Reader) {
return newMultiReaderTimeout(r, 15*time.Second)
}