Skip to content

Commit

Permalink
Add support for sending long running traces
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Sep 15, 2021
1 parent b80ef06 commit 75a3c00
Showing 1 changed file with 124 additions and 54 deletions.
178 changes: 124 additions & 54 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ var (
prometheusListenAddress string
prometheusPath string

tempoQueryURL string
tempoPushURL string
tempoOrgID string
tempoWriteBackoffDuration time.Duration
tempoReadBackoffDuration time.Duration
tempoSearchBackoffDuration time.Duration
tempoRetentionDuration time.Duration
tempoSearchRetentionDuration time.Duration
tempoQueryURL string
tempoPushURL string
tempoOrgID string
tempoWriteBackoffDuration time.Duration
tempoLongWriteBackoffDuration time.Duration
tempoReadBackoffDuration time.Duration
tempoSearchBackoffDuration time.Duration
tempoRetentionDuration time.Duration
tempoSearchRetentionDuration time.Duration

logger *zap.Logger
)
Expand All @@ -56,6 +57,13 @@ type traceMetrics struct {
notFoundSearchAttribute int
}

type traceInfo struct {
timestamp time.Time
r *rand.Rand
traceIDHigh int64
traceIDLow int64
}

func init() {
flag.StringVar(&prometheusPath, "prometheus-path", "/metrics", "The path to publish Prometheus metrics to.")
flag.StringVar(&prometheusListenAddress, "prometheus-listen-address", ":80", "The address to listen on for Prometheus scrapes.")
Expand All @@ -64,6 +72,7 @@ func init() {
flag.StringVar(&tempoPushURL, "tempo-push-url", "", "The URL (scheme://hostname:port) at which to push traces to Tempo.")
flag.StringVar(&tempoOrgID, "tempo-org-id", "", "The orgID to query in Tempo")
flag.DurationVar(&tempoWriteBackoffDuration, "tempo-write-backoff-duration", 15*time.Second, "The amount of time to pause between write Tempo calls")
flag.DurationVar(&tempoLongWriteBackoffDuration, "tempo-long-write-backoff-duration", 1*time.Minute, "The amount of time to pause between long write Tempo calls")
flag.DurationVar(&tempoReadBackoffDuration, "tempo-read-backoff-duration", 30*time.Second, "The amount of time to pause between read Tempo calls")
flag.DurationVar(&tempoSearchBackoffDuration, "tempo-search-backoff-duration", 60*time.Second, "The amount of time to pause between search Tempo calls")
flag.DurationVar(&tempoRetentionDuration, "tempo-retention-duration", 336*time.Hour, "The block retention that Tempo is using")
Expand Down Expand Up @@ -91,7 +100,7 @@ func main() {

// Write
go func() {
c, err := newJaegerGRPCClient(tempoPushURL)
client, err := newJaegerGRPCClient(tempoPushURL)
if err != nil {
panic(err)
}
Expand All @@ -103,28 +112,15 @@ func main() {
traceIDHigh := r.Int63()
traceIDLow := r.Int63()

log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", traceIDHigh, traceIDLow)),
zap.Int64("seed", timestamp.Unix()),
)
log.Info("sending trace")

for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
ctx := user.InjectOrgID(context.Background(), tempoOrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
log.Error("error injecting org id", zap.Error(err))
metricErrorTotal.Inc()
continue
}
err = c.EmitBatch(ctx, makeThriftBatch(traceIDHigh, traceIDLow, r, timestamp))
if err != nil {
log.Error("error pushing batch to Tempo", zap.Error(err))
metricErrorTotal.Inc()
continue
}
info := &traceInfo{
timestamp: timestamp,
r: r,
traceIDHigh: traceIDHigh,
traceIDLow: traceIDLow,
}

emitBatches(client, info)
queueFutureBatches(client, info)
}
}()

Expand Down Expand Up @@ -205,6 +201,54 @@ func main() {
log.Fatal(http.ListenAndServe(prometheusListenAddress, nil))
}

func emitBatches(c *jaeger_grpc.Reporter, t *traceInfo) {
log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", t.traceIDHigh, t.traceIDLow)),
zap.Int64("seed", t.timestamp.Unix()),
)

log.Info("sending trace")

for i := int64(0); i < generateRandomInt(1, 100, t.r); i++ {
ctx := user.InjectOrgID(context.Background(), tempoOrgID)
ctx, err := user.InjectIntoGRPCRequest(ctx)
if err != nil {
log.Error("error injecting org id", zap.Error(err))
metricErrorTotal.Inc()
continue
}
err = c.EmitBatch(ctx, makeThriftBatch(t.traceIDHigh, t.traceIDLow, t.r, t.timestamp))
if err != nil {
log.Error("error pushing batch to Tempo", zap.Error(err))
metricErrorTotal.Inc()
continue
}
}

}

func queueFutureBatches(client *jaeger_grpc.Reporter, info *traceInfo) {
log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", info.traceIDHigh, info.traceIDLow)),
zap.Int64("seed", info.timestamp.Unix()),
)

if maybe(info.r) {
log.Info("queueing future batches")
go func() {
time.Sleep(tempoLongWriteBackoffDuration)
emitBatches(client, info)
queueFutureBatches(client, info)
}()
}
}

func maybe(r *rand.Rand) bool {
return r.Intn(10) >= 8
}

func pushMetrics(metrics traceMetrics) {
metricTracesInspected.Add(float64(metrics.requested))
metricTracesErrors.WithLabelValues("incorrectresult").Add(float64(metrics.incorrectResult))
Expand Down Expand Up @@ -507,40 +551,66 @@ func hasMissingSpans(t *tempopb.Trace) bool {

func constructTraceFromEpoch(epoch time.Time) *tempopb.Trace {
r := newRand(epoch)
traceIDHigh := r.Int63()
traceIDLow := r.Int63()

info := &traceInfo{
timestamp: epoch,
r: r,
traceIDHigh: r.Int63(),
traceIDLow: r.Int63(),
}

trace := &tempopb.Trace{}

for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
batch := makeThriftBatch(traceIDHigh, traceIDLow, r, epoch)
internalTrace := jaegerTrans.ThriftBatchToInternalTraces(batch)
conv, err := internalTrace.ToOtlpProtoBytes()
if err != nil {
logger.Error(err.Error())
}
addBatches := func(t *traceInfo, trace *tempopb.Trace) {
for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
batch := makeThriftBatch(t.traceIDHigh, t.traceIDLow, r, epoch)
internalTrace := jaegerTrans.ThriftBatchToInternalTraces(batch)
conv, err := internalTrace.ToOtlpProtoBytes()
if err != nil {
logger.Error(err.Error())
}

t := tempopb.Trace{}
err = t.Unmarshal(conv)
if err != nil {
logger.Error(err.Error())
}
t := tempopb.Trace{}
err = t.Unmarshal(conv)
if err != nil {
logger.Error(err.Error())
}

// Due to the several transforms above, some manual mangling is required to
// get the parentSpanID to match. In the case of an empty []byte in place
// for the ParentSpanId, we set to nil here to ensure that the final result
// matches the json.Unmarshal value when tempo is queried.
for _, b := range t.Batches {
for _, l := range b.InstrumentationLibrarySpans {
for _, s := range l.Spans {
if len(s.GetParentSpanId()) == 0 {
s.ParentSpanId = nil
// Due to the several transforms above, some manual mangling is required to
// get the parentSpanID to match. In the case of an empty []byte in place
// for the ParentSpanId, we set to nil here to ensure that the final result
// matches the json.Unmarshal value when tempo is queried.
for _, b := range t.Batches {
for _, l := range b.InstrumentationLibrarySpans {
for _, s := range l.Spans {
if len(s.GetParentSpanId()) == 0 {
s.ParentSpanId = nil
}
}
}
}

trace.Batches = append(trace.Batches, t.Batches...)
}
}

trace.Batches = append(trace.Batches, t.Batches...)
addBatches(info, trace)

lastWrite := info.timestamp
for maybe(info.r) {
lastWrite = lastWrite.Add(tempoLongWriteBackoffDuration)

log := logger.With(
zap.Int64("seed", info.timestamp.Unix()),
zap.Duration("since", time.Since(lastWrite)),
)

log.Info("checking expectedEnd")

if time.Since(lastWrite) > (1 * time.Second) {
log.Info("adding batches")
addBatches(info, trace)
}
}

return trace
Expand Down

0 comments on commit 75a3c00

Please sign in to comment.