-
Notifications
You must be signed in to change notification settings - Fork 85
/
perf_events.go
235 lines (204 loc) · 5.55 KB
/
perf_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Copyright (c) 2019 Dropbox, Inc.
// Full license can be found in the LICENSE file.
package goebpf
/*
#ifdef __linux__
#include <linux/perf_event.h>
#include <sys/sysinfo.h>
#define PERF_EVENT_HEADER_SIZE (sizeof(struct perf_event_header))
#else
// mocks for Mac
#define PERF_EVENT_HEADER_SIZE 8
#define PERF_RECORD_SAMPLE 9
#define PERF_RECORD_LOST 2
int get_nprocs()
{
return 1;
}
#endif
*/
import "C"
import (
"bytes"
"encoding/binary"
"fmt"
"sync"
)
// PerfEvents is a way to interact with Linux's PerfEvents for eBPF cases.
type PerfEvents struct {
// Statistics
EventsReceived int
EventsLost int
EventsUnknowType int
// PollTimeoutMs is timeout for blocking call of poll()
// Defaults to 100ms
PollTimeoutMs int
poller *perfEventPoller
perfMap Map
updatesChannel chan []byte
stopChannel chan struct{}
wg sync.WaitGroup
handlers []*perfEventHandler
}
// Go definition for C structs from
// http://man7.org/linux/man-pages/man2/perf_event_open.2.html
//
// struct perf_event_header {
// __u32 type;
// __u16 misc;
// __u16 size;
// }
type perfEventHeader struct {
Type uint32
Misc uint16
Size uint16
}
// struct perf_event_lost {
// uint64_t id;
// uint64_t lost;
//
// not added: struct sample_id sample_id;
// }
type perfEventLost struct {
Id uint64
Lost uint64
}
// NewPerfEvents creates new instance of PerfEvents for eBPF map "m".
// "m" must be a type of "MapTypePerfEventArray"
func NewPerfEvents(m Map) (*PerfEvents, error) {
if m.GetType() != MapTypePerfEventArray {
return nil, fmt.Errorf("Invalid map type '%v'", m.GetType())
}
return &PerfEvents{
perfMap: m,
PollTimeoutMs: 100,
}, nil
}
// StartForAllProcessesAndCPUs starts PerfEvent polling on all CPUs for all system processes
// This mode requires specially organized map: index matches CPU ID.
// "bufferSize" is ring buffer size for perfEvents. Per CPU.
// All updates will be sent into returned channel.
func (pe *PerfEvents) StartForAllProcessesAndCPUs(bufferSize int) (<-chan []byte, error) {
// Get ONLINE CPU count.
// There maybe confusion between get_nprocs() and GetNumOfPossibleCpus() functions:
// - get_nprocs() returns ONLINE CPUs
// - GetNumOfPossibleCpus() returns POSSIBLE (including currently offline) CPUs
// So space for eBPF maps should be reserved for ALL possible CPUs,
// but perfEvents may work only on online CPUs
nCpus := int(C.get_nprocs())
// Create perfEvent handler for all possible CPUs
var err error
var handler *perfEventHandler
pe.handlers = make([]*perfEventHandler, nCpus)
for cpu := 0; cpu < nCpus; cpu++ {
handler, err = newPerfEventHandler(cpu, -1, bufferSize) // All processes
if err != nil {
// Error handling to be done after for loop
break
}
err = pe.perfMap.Update(cpu, int(handler.pmuFd))
if err != nil {
// Error handling to be done after for loop
break
}
handler.Enable()
pe.handlers[cpu] = handler
}
// Handle loop errors: release allocated resources / return error
if err != nil {
for _, handler := range pe.handlers {
if handler != nil {
handler.Release()
}
}
return nil, err
}
pe.startLoop()
return pe.updatesChannel, nil
}
// Stop stops event polling loop
func (pe *PerfEvents) Stop() {
// Stop poller firstly
pe.poller.Stop()
// Stop poll loop
close(pe.stopChannel)
// Wait until poll loop stopped, then close updates channel
pe.wg.Wait()
close(pe.updatesChannel)
// Release resources
for _, handler := range pe.handlers {
handler.Release()
}
}
func (pe *PerfEvents) startLoop() {
pe.stopChannel = make(chan struct{})
pe.updatesChannel = make(chan []byte)
pe.wg.Add(1)
go pe.loop()
}
func (pe *PerfEvents) loop() {
// Setup poller to poll all handlers (one handler per CPU)
pe.poller = newPerfEventPoller()
for _, handler := range pe.handlers {
pe.poller.Add(handler)
}
// Start poller
pollerCh := pe.poller.Start(pe.PollTimeoutMs)
defer func() {
pe.wg.Done()
}()
// Wait until at least one perf event fd becomes readable (has new data)
for {
select {
case handler, ok := <-pollerCh:
if !ok {
return
}
pe.handlePerfEvent(handler)
case <-pe.stopChannel:
return
}
}
}
func (pe *PerfEvents) handlePerfEvent(handler *perfEventHandler) {
// Process all new samples at once
for handler.ringBuffer.DataAvailable() {
// Read perfEvent header
var header perfEventHeader
reader := bytes.NewReader(
handler.ringBuffer.Read(C.PERF_EVENT_HEADER_SIZE),
)
binary.Read(reader, binary.LittleEndian, &header)
// Read PerfEvent data (header.Size is total size of event: header + data)
data := handler.ringBuffer.Read(
int(header.Size - C.PERF_EVENT_HEADER_SIZE),
)
// Process event
switch header.Type {
case C.PERF_RECORD_SAMPLE:
// Sample defined as:
// struct perf_event_sample {
// struct perf_event_header header;
// uint32_t data_size;
// char data[];
// };
// We've already parsed header, so parse only data_size
dataSize := binary.LittleEndian.Uint32(data)
// Send data into channel
pe.updatesChannel <- data[4 : dataSize+4]
pe.EventsReceived++
case C.PERF_RECORD_LOST:
// This is special record type - contains how many record (events)
// lost due to small buffer or slow event processing.
var lost perfEventLost
reader := bytes.NewReader(data)
binary.Read(reader, binary.LittleEndian, &lost)
pe.EventsLost += int(lost.Lost)
default:
pe.EventsUnknowType++
}
}
// This is ring buffer: move tail forward to indicate
// that we've processed some data
handler.ringBuffer.UpdateTail()
}