-
Notifications
You must be signed in to change notification settings - Fork 0
/
collector.go
273 lines (233 loc) · 7.16 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package autoprof
import (
"archive/zip"
"bytes"
"context"
"io"
"net/url"
"runtime/pprof"
"runtime/trace"
"sort"
"sync"
"time"
)
// NewZipCollector returns a Collector which will write out a profile bundle
// formatted as a zip archive to the provided io.Writer.
func NewZipCollector(w io.Writer, meta *ArchiveMeta, opt *ArchiveOptions) *Collector {
zw := zip.NewWriter(w)
return &Collector{
meta: meta,
opt: opt,
writeFileHeader: func(name string) (io.Writer, error) {
return zw.CreateHeader(&zip.FileHeader{Name: name, Method: zip.Store})
},
finish: zw.Close,
}
}
// ArchiveOptions holds user preferences and instructions for the collection
// of a profile bundle.
type ArchiveOptions struct {
// CPUProfileDuration is the requested duration of the CPU profile. Leave
// at 0 to disable CPU profiling.
CPUProfileDuration time.Duration
// CPUProfileByteTarget is an optional soft limit on the size of the CPU
// profile. The collector will stop the profile as soon as possible after
// it reaches the size target. When unset, there is no limit.
CPUProfileByteTarget int64
// ExecutionTraceDuration is the requested duration of the execution
// trace. Leave at 0 to disable execution tracing.
ExecutionTraceDuration time.Duration
// ExecutionTraceByteTarget is an optional soft limit on the size of the
// execution trace output. The collector will stop the execution trace as
// soon as possible after it reaches the size target. When unset, there is
// no limit.
ExecutionTraceByteTarget int64
// CustomDataSources holds user-specified additional data sources. When
// generating a zip-archived profile bundle, data from these sources will
// be included in the "custom/" directory. The map key names will be URI
// path-escaped and used to name the files within that directory.
CustomDataSources map[string]*DataSource
}
// A DataSource can generate data to be included in a profile bundle.
type DataSource struct {
WriteTo func(ctx context.Context, w io.Writer) error
}
// A Collector assembles and writes out a profile bundle. It cannot be reused.
type Collector struct {
meta *ArchiveMeta
opt *ArchiveOptions
// writeFileHeader prepares the profile bundle to receive data for a
// record with the provided name.
writeFileHeader func(name string) (io.Writer, error)
// finish completes the profile bundle, indicating that no more data will
// be written.
finish func() error
// addErr holds onto any error encountered while calling the add method
// for delayed processing.
addErr error
}
// add stores the data from source into the profile bundle, using the provided
// name. It tracks any errors that occur when adding data into the bundle, and
// returns early if any previous call encountered an error.
func (c *Collector) add(ctx context.Context, name string, source *DataSource) {
if source == nil || source.WriteTo == nil {
return
}
if c.addErr != nil {
return
}
var w io.Writer
w, c.addErr = c.writeFileHeader(name)
if c.addErr != nil {
return
}
c.addErr = source.WriteTo(ctx, w)
}
// Run collects the specified profile bundle.
func (c *Collector) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c.add(ctx, "meta", metaSource(c.meta))
c.add(ctx, "expvar", expvarSource())
// write heap profile first, so it's in a consistent position
c.add(ctx, "pprof/heap", pprofSource(pprof.Lookup("heap")))
for _, profile := range pprof.Profiles() {
if name := profile.Name(); name != "heap" {
c.add(ctx, "pprof/"+url.PathEscape(name), pprofSource(profile))
}
}
custom := make([]string, 0, len(c.opt.CustomDataSources))
for name := range c.opt.CustomDataSources {
custom = append(custom, name)
}
sort.Strings(custom)
for _, name := range custom {
c.add(ctx, "custom/"+url.PathEscape(name), c.opt.CustomDataSources[name])
}
if c.addErr != nil {
return c.addErr
}
if c.opt.CPUProfileDuration > 0 {
err := c.addCPUProfile(ctx, "pprof/profile")
if err != nil {
return err
}
}
if c.opt.ExecutionTraceDuration > 0 {
err := c.addExecutionTrace(ctx, "pprof/trace", "pprof/profile-during-trace")
if err != nil {
return err
}
}
return c.finish()
}
func (c *Collector) addCPUProfile(ctx context.Context, name string) error {
ctx, cancel := context.WithTimeout(ctx, c.opt.CPUProfileDuration)
defer cancel()
return c.addTimeBasedProfile(ctx, name, c.opt.CPUProfileByteTarget, pprof.StartCPUProfile, pprof.StopCPUProfile)
}
func (c *Collector) addExecutionTrace(ctx context.Context, name, profileName string) error {
ctx, cancel := context.WithTimeout(ctx, c.opt.ExecutionTraceDuration)
defer cancel()
start := trace.Start
stop := trace.Stop
var cpuProfile *bytes.Buffer
if c.opt.CPUProfileDuration > 0 {
// CPU profiles are enabled for this bundle. Run a CPU profile that
// wholly encompasses the execution trace, to make CPU samples appear in
// the execution trace (new in Go 1.19).
start = func(w io.Writer) error {
// The CPU profile starts before the execution trace. The
// runtime/pprof package aggregates the CPU profile in memory and
// only writes out data during the StopCPUProfile call, so there's
// no use in streaming the data to enforce a soft limit on its size.
var buf bytes.Buffer
if err := pprof.StartCPUProfile(&buf); err == nil {
cpuProfile = &buf
}
return trace.Start(w)
}
stop = func() {
trace.Stop()
// The CPU profile stops after the execution trace. The execution
// trace still has an open io.Writer to the zip archive; we'll need
// to wait for that to finish before adding a new file.
if cpuProfile != nil {
pprof.StopCPUProfile()
}
}
}
traceErr := c.addTimeBasedProfile(ctx, name, c.opt.ExecutionTraceByteTarget, start, stop)
profileErr := func() error {
if cpuProfile == nil {
return nil
}
w, err := c.writeFileHeader(profileName)
if err != nil {
return err
}
_, err = io.Copy(w, cpuProfile)
return err
}()
if traceErr != nil {
return traceErr
}
return profileErr
}
func (c *Collector) addTimeBasedProfile(ctx context.Context, name string, targetSize int64,
start func(w io.Writer) error, stop func()) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
pr, pw := io.Pipe()
err := start(pw)
if err != nil {
// A profile is already in progress, such as by an interactive request
// to /debug/pprof/{profile,trace}
//
// Skip this part of the debug bundle collection.
return nil
}
// Now that we know we'll have data, prepare to add it to the profile
// bundle.
w, err := c.writeFileHeader(name)
if err != nil {
return err
}
if targetSize > 0 {
w = &limitTriggerWriter{
wr: w,
fn: cancel,
remaining: targetSize,
}
}
var copyErr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, copyErr = io.Copy(w, pr)
}()
<-ctx.Done()
stop()
closeErr := pw.Close()
wg.Wait()
err = copyErr
if err == nil {
err = closeErr
}
return err
}
type limitTriggerWriter struct {
wr io.Writer
fn func()
remaining int64
}
func (lw *limitTriggerWriter) Write(p []byte) (int, error) {
n, err := lw.wr.Write(p)
lw.remaining -= int64(n)
if lw.remaining <= 0 && lw.fn != nil {
lw.fn()
lw.fn = nil
}
return n, err
}