Skip to content

Commit

Permalink
Add option for limiting the number of logs in a Span
Browse files Browse the repository at this point in the history
The MaxLogsPerSpan option limits the number of logs in a Span. Events are
dropped as necessary; a log in the finished span indicates how many were
dropped.

The default limit is 100.

Fixes #38.
  • Loading branch information
RaduBerinde committed Sep 29, 2016
1 parent 15d0e11 commit c936961
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 7 deletions.
70 changes: 64 additions & 6 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type spanImpl struct {
event func(SpanEvent)
sync.Mutex // protects the fields below
raw RawSpan
// The number of logs dropped because of MaxLogsPerSpan.
numDroppedLogs int
}

var spanPool = &sync.Pool{New: func() interface{} {
Expand Down Expand Up @@ -98,6 +100,21 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) {
s.LogFields(fields...)
}

func (s *spanImpl) appendLog(lr opentracing.LogRecord) {
maxLogs := s.tracer.options.MaxLogsPerSpan
if maxLogs == 0 || len(s.raw.Logs) < maxLogs {
s.raw.Logs = append(s.raw.Logs, lr)
return
}

// We have too many logs. We don't touch the first numOld logs; we treat the
// rest as a circular buffer and overwrite the oldest log among those.
numOld := (maxLogs - 1) / 2
numNew := maxLogs - numOld
s.raw.Logs[numOld+s.numDroppedLogs%numNew] = lr
s.numDroppedLogs++
}

func (s *spanImpl) LogFields(fields ...log.Field) {
lr := opentracing.LogRecord{
Fields: fields,
Expand All @@ -111,7 +128,7 @@ func (s *spanImpl) LogFields(fields ...log.Field) {
if lr.Timestamp.IsZero() {
lr.Timestamp = time.Now()
}
s.raw.Logs = append(s.raw.Logs, lr)
s.appendLog(lr)
}

func (s *spanImpl) LogEvent(event string) {
Expand Down Expand Up @@ -139,13 +156,30 @@ func (s *spanImpl) Log(ld opentracing.LogData) {
ld.Timestamp = time.Now()
}

s.raw.Logs = append(s.raw.Logs, ld.ToLogRecord())
s.appendLog(ld.ToLogRecord())
}

func (s *spanImpl) Finish() {
s.FinishWithOptions(opentracing.FinishOptions{})
}

// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
// the end (i.e. pos circular left shifts).
func rotateLogBuffer(buf []opentracing.LogRecord, pos int) {
// This algorithm is described in:
// http://www.cplusplus.com/reference/algorithm/rotate
for first, middle, next := 0, pos, pos; first != middle; {
buf[first], buf[next] = buf[next], buf[first]
first++
next++
if next == len(buf) {
next = middle
} else if first == middle {
middle = next
}
}
}

func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
finishTime := opts.FinishTime
if finishTime.IsZero() {
Expand All @@ -155,18 +189,42 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {

s.Lock()
defer s.Unlock()
if opts.LogRecords != nil {
s.raw.Logs = append(s.raw.Logs, opts.LogRecords...)

for _, lr := range opts.LogRecords {
s.appendLog(lr)
}
for _, ld := range opts.BulkLogData {
s.raw.Logs = append(s.raw.Logs, ld.ToLogRecord())
s.appendLog(ld.ToLogRecord())
}

if s.numDroppedLogs > 0 {
// We dropped some log events, which means that we used part of Logs as a
// circular buffer (see appendLog). De-circularize it.
numOld := (len(s.raw.Logs) - 1) / 2
numNew := len(s.raw.Logs) - numOld
rotateLogBuffer(s.raw.Logs[numOld:], s.numDroppedLogs%numNew)

// Replace the log in the middle (the oldest "new" log) with information
// about the dropped logs. This means that we are effectively dropping one
// more "new" log.
numDropped := s.numDroppedLogs + 1
s.raw.Logs[numOld] = opentracing.LogRecord{
// Keep the timestamp of the last dropped event.
Timestamp: s.raw.Logs[numOld].Timestamp,
Fields: []log.Field{
log.String("event", "dropped Span logs"),
log.Int("dropped_log_count", numDropped),
log.String("component", "basictracer"),
},
}
}

s.raw.Duration = duration

s.onFinish(s.raw)
s.tracer.options.Recorder.RecordSpan(s.raw)

// Last chance to get options before the span is possbily reset.
// Last chance to get options before the span is possibly reset.
poolEnabled := s.tracer.options.EnableSpanPool
if s.tracer.options.DebugAssertUseAfterFinish {
// This makes it much more likely to catch a panic on any subsequent
Expand Down
13 changes: 12 additions & 1 deletion tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ type Options struct {
// DropAllLogs turns log events on all Spans into no-ops.
// If NewSpanEventListener is set, the callbacks will still fire.
DropAllLogs bool
// MaxLogsPerSpan limits the number of Logs in a span (if set to a nonzero
// value). If a span has more logs than this value, logs are dropped as
// necessary (and replaced with a log describing how many were dropped).
//
// About half of the MaxLogPerSpan logs kept are the oldest logs, and about
// half are the newest logs.
//
// If NewSpanEventListener is set, the callbacks will still fire for all log
// events. This value is ignored if DropAllLogs is true.
MaxLogsPerSpan int
// DebugAssertSingleGoroutine internally records the ID of the goroutine
// creating each Span and verifies that no operation is carried out on
// it on a different goroutine.
Expand Down Expand Up @@ -87,7 +97,8 @@ type Options struct {
// returned object with a Tracer.
func DefaultOptions() Options {
return Options{
ShouldSample: func(traceID uint64) bool { return traceID%64 == 0 },
ShouldSample: func(traceID uint64) bool { return traceID%64 == 0 },
MaxLogsPerSpan: 100,
}
}

Expand Down

0 comments on commit c936961

Please sign in to comment.