-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
processor.go
311 lines (265 loc) · 8.63 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package processor
import (
"context"
"sync"
"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/diagnostic"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/pkg/logs/sds"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
// UnstructuredProcessingMetricName collects how many rules are used on unstructured
// content for tailers capable of processing both unstructured and structured content.
const UnstructuredProcessingMetricName = "datadog.logs_agent.tailer.unstructured_processing"
// A Processor updates messages from an inputChan and pushes
// in an outputChan.
type Processor struct {
pipelineID int
inputChan chan *message.Message
outputChan chan *message.Message // strategy input
// ReconfigChan transports rules to use in order to reconfigure
// the processing rules of the SDS Scanner.
ReconfigChan chan sds.ReconfigureOrder
processingRules []*config.ProcessingRule
encoder Encoder
done chan struct{}
diagnosticMessageReceiver diagnostic.MessageReceiver
mu sync.Mutex
hostname hostnameinterface.Component
sds sdsProcessor
}
type sdsProcessor struct {
// buffer stores the messages for the buffering mechanism in case we didn't
// receive any SDS configuration & wait_for_configuration == "buffer".
buffer []*message.Message
bufferedBytes int
maxBufferSize int
/// buffering indicates if we're buffering while waiting for an SDS configuration
buffering bool
scanner *sds.Scanner // configured through RC
}
// New returns an initialized Processor.
func New(cfg pkgconfigmodel.Reader, inputChan, outputChan chan *message.Message, processingRules []*config.ProcessingRule,
encoder Encoder, diagnosticMessageReceiver diagnostic.MessageReceiver, hostname hostnameinterface.Component,
pipelineID int) *Processor {
waitForSDSConfig := sds.ShouldBufferUntilSDSConfiguration(cfg)
maxBufferSize := sds.WaitForConfigurationBufferMaxSize(cfg)
return &Processor{
pipelineID: pipelineID,
inputChan: inputChan,
outputChan: outputChan, // strategy input
ReconfigChan: make(chan sds.ReconfigureOrder),
processingRules: processingRules,
encoder: encoder,
done: make(chan struct{}),
diagnosticMessageReceiver: diagnosticMessageReceiver,
hostname: hostname,
sds: sdsProcessor{
// will immediately starts buffering if it has been configured as so
buffering: waitForSDSConfig,
maxBufferSize: maxBufferSize,
scanner: sds.CreateScanner(pipelineID),
},
}
}
// Start starts the Processor.
func (p *Processor) Start() {
go p.run()
}
// Stop stops the Processor,
// this call blocks until inputChan is flushed
func (p *Processor) Stop() {
close(p.inputChan)
<-p.done
// once the processor mainloop is not running, it's safe
// to delete the sds scanner instance.
if p.sds.scanner != nil {
p.sds.scanner.Delete()
p.sds.scanner = nil
}
}
// Flush processes synchronously the messages that this processor has to process.
// Mainly (only?) used by the Serverless Agent.
func (p *Processor) Flush(ctx context.Context) {
p.mu.Lock()
defer p.mu.Unlock()
for {
select {
case <-ctx.Done():
return
default:
if len(p.inputChan) == 0 {
return
}
msg := <-p.inputChan
p.processMessage(msg)
}
}
}
// run starts the processing of the inputChan
func (p *Processor) run() {
defer func() {
p.done <- struct{}{}
}()
for {
select {
// Processing, usual main loop
// ---------------------------
case msg, ok := <-p.inputChan:
if !ok { // channel has been closed
return
}
// if we have to wait for an SDS configuration to start processing & forwarding
// the logs, that's here that we buffer the message
if p.sds.buffering {
// buffer until we receive a configuration
p.sds.bufferMsg(msg)
} else {
// process the message
p.processMessage(msg)
}
p.mu.Lock() // block here if we're trying to flush synchronously
//nolint:staticcheck
p.mu.Unlock()
// SDS reconfiguration
// -------------------
case order := <-p.ReconfigChan:
p.mu.Lock()
p.applySDSReconfiguration(order)
p.mu.Unlock()
}
}
}
func (p *Processor) applySDSReconfiguration(order sds.ReconfigureOrder) {
isActive, err := p.sds.scanner.Reconfigure(order)
response := sds.ReconfigureResponse{
IsActive: isActive,
Err: err,
}
if err != nil {
log.Errorf("Error while reconfiguring the SDS scanner: %v", err)
} else {
// no error while reconfiguring the SDS scanner and since it looks active now,
// we should drain the buffered messages if any and stop the
// buffering mechanism.
if p.sds.buffering && isActive {
log.Debug("Processor ready with an SDS configuration.")
p.sds.buffering = false
// drain the buffer of messages if anything's in there
if len(p.sds.buffer) > 0 {
log.Info("SDS: sending", len(p.sds.buffer), "buffered messages")
for _, msg := range p.sds.buffer {
p.processMessage(msg)
}
}
p.sds.resetBuffer()
}
// no else case, the buffering is only a startup mechanism, after having
// enabled the SDS scanners, if they become inactive it is because the
// configuration has been sent like that.
}
order.ResponseChan <- response
}
func (s *sdsProcessor) bufferMsg(msg *message.Message) {
s.buffer = append(s.buffer, msg)
s.bufferedBytes += len(msg.GetContent())
for len(s.buffer) > 0 {
if s.bufferedBytes > s.maxBufferSize {
s.bufferedBytes -= len(s.buffer[0].GetContent())
s.buffer = s.buffer[1:]
metrics.TlmLogsDiscardedFromSDSBuffer.Inc()
} else {
break
}
}
}
func (s *sdsProcessor) resetBuffer() {
s.buffer = nil
s.bufferedBytes = 0
}
func (p *Processor) processMessage(msg *message.Message) {
metrics.LogsDecoded.Add(1)
metrics.TlmLogsDecoded.Inc()
if toSend := p.applyRedactingRules(msg); toSend {
metrics.LogsProcessed.Add(1)
metrics.TlmLogsProcessed.Inc()
// render the message
rendered, err := msg.Render()
if err != nil {
log.Error("can't render the msg", err)
return
}
msg.SetRendered(rendered)
// report this message to diagnostic receivers (e.g. `stream-logs` command)
p.diagnosticMessageReceiver.HandleMessage(msg, rendered, "")
// encode the message to its final format, it is done in-place
if err := p.encoder.Encode(msg, p.GetHostname(msg)); err != nil {
log.Error("unable to encode msg ", err)
return
}
p.outputChan <- msg
}
}
// applyRedactingRules returns given a message if we should process it or not,
// it applies the change directly on the Message content.
func (p *Processor) applyRedactingRules(msg *message.Message) bool {
var content []byte = msg.GetContent()
// Use the internal scrubbing implementation of the Agent
// ---------------------------
rules := append(p.processingRules, msg.Origin.LogSource.Config.ProcessingRules...)
for _, rule := range rules {
switch rule.Type {
case config.ExcludeAtMatch:
// if this message matches, we ignore it
if rule.Regex.Match(content) {
return false
}
case config.IncludeAtMatch:
// if this message doesn't match, we ignore it
if !rule.Regex.Match(content) {
return false
}
case config.MaskSequences:
content = rule.Regex.ReplaceAll(content, rule.Placeholder)
}
}
// Use the SDS implementation
// --------------------------
// Global SDS scanner, applied on all log sources
if p.sds.scanner.IsReady() {
mutated, evtProcessed, err := p.sds.scanner.Scan(content, msg)
if err != nil {
log.Error("while using SDS to scan the log:", err)
} else if mutated {
content = evtProcessed
}
}
msg.SetContent(content)
return true // we want to send this message
}
// GetHostname returns the hostname to applied the given log message
func (p *Processor) GetHostname(msg *message.Message) string {
if msg.Hostname != "" {
return msg.Hostname
}
if msg.Lambda != nil {
return msg.Lambda.ARN
}
if p.hostname == nil {
return "unknown"
}
hname, err := p.hostname.Get(context.TODO())
if err != nil {
// this scenario is not likely to happen since
// the agent cannot start without a hostname
hname = "unknown"
}
return hname
}