Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TraceQL Perf: Reduce metadata retrieved by implementing two pass iteration. #2119

Merged
merged 29 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6953bad
sketching out ideas
joe-elliott Feb 3, 2023
e871134
wip: traceql engine two pass
joe-elliott Feb 3, 2023
81b68bc
inteface cleanup
joe-elliott Feb 6, 2023
a7afed4
technically compiles
joe-elliott Feb 6, 2023
12b2cdc
added a way to dump the iterator tree
joe-elliott Jan 18, 2023
1bdd95a
fixin' bugs
joe-elliott Feb 6, 2023
b721648
add meta support
joe-elliott Feb 7, 2023
5d13894
test finagling
joe-elliott Feb 15, 2023
998b376
extend benchmarks
joe-elliott Feb 15, 2023
a603824
test cleanup
joe-elliott Feb 21, 2023
100ce11
removed query field
joe-elliott Feb 21, 2023
04b4ba0
lint
joe-elliott Feb 21, 2023
bc9e247
fix test
joe-elliott Feb 21, 2023
632cdce
fixed/improved bench
joe-elliott Feb 22, 2023
ca8a8af
span -> *span
joe-elliott Feb 22, 2023
09ac676
pool spans between span and batch collectors
joe-elliott Feb 22, 2023
26cd365
use shared span slice and lazily creaetd spanset
joe-elliott Feb 22, 2023
0ab4eea
fix
joe-elliott Feb 22, 2023
7da5b88
more putSpans
joe-elliott Feb 23, 2023
c9bac95
overwrite atts
joe-elliott Feb 23, 2023
beb06df
Change traceql.Span to be an interface
joe-elliott Feb 27, 2023
ebe27ee
readded span ids
joe-elliott Feb 27, 2023
8c0d60b
remove kindAsCount
joe-elliott Feb 28, 2023
218ba99
fix optimization
joe-elliott Feb 28, 2023
00b2fd2
tier out close
joe-elliott Feb 28, 2023
16f352f
Merge branch 'main' into two-pass-is-best-pass
joe-elliott Feb 28, 2023
110f716
patched up tests
joe-elliott Feb 28, 2023
5e91bab
Merge branch 'main' into two-pass-is-best-pass
joe-elliott Mar 3, 2023
0388a62
review
joe-elliott Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"sync/atomic"

"github.com/grafana/tempo/pkg/util"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
pq "github.com/segmentio/parquet-go"
Expand Down Expand Up @@ -148,6 +149,15 @@ func (r *IteratorResult) AppendOtherValue(k string, v interface{}) {
}{k, v})
}

func (r *IteratorResult) OtherValueFromKey(k string) interface{} {
for _, e := range r.OtherEntries {
if e.Key == k {
return e.Value
}
}
return nil
}

// ToMap converts the unstructured list of data into a map containing an entry
// for each column, and the lists of values. The order of columns is
// not preseved, but the order of values within each column is.
Expand Down Expand Up @@ -184,6 +194,8 @@ func (r *IteratorResult) Columns(buffer [][]pq.Value, names ...string) [][]pq.Va

// iterator - Every iterator follows this interface and can be composed.
type Iterator interface {
fmt.Stringer

// Next returns nil when done
Next() (*IteratorResult, error)

Expand Down Expand Up @@ -288,6 +300,10 @@ func NewColumnIterator(ctx context.Context, rgs []pq.RowGroup, column int, colum
return c
}

func (c *ColumnIterator) String() string {
return fmt.Sprintf("ColumnIterator: %s \n\t%s", c.colName, util.TabOut(c.filter))
}

func (c *ColumnIterator) iterate(ctx context.Context, readSize int) {
defer close(c.ch)

Expand Down Expand Up @@ -577,6 +593,14 @@ func NewJoinIterator(definitionLevel int, iters []Iterator, pred GroupPredicate)
return &j
}

func (j *JoinIterator) String() string {
var iters string
for _, iter := range j.iters {
iters += "\n\t" + util.TabOut(iter)
}
return fmt.Sprintf("JoinIterator: %d\t%s\n%s)", j.definitionLevel, iters, j.pred)
}

func (j *JoinIterator) Next() (*IteratorResult, error) {
// Here is the algorithm for joins: On each pass of the iterators
// we remember which ones are pointing at the earliest rows. If all
Expand Down Expand Up @@ -739,6 +763,18 @@ func NewLeftJoinIterator(definitionLevel int, required, optional []Iterator, pre
return &j
}

func (j *LeftJoinIterator) String() string {
srequired := "required: "
for _, r := range j.required {
srequired += "\n\t\t" + util.TabOut(r)
}
soptional := "optional: "
for _, o := range j.optional {
soptional += "\n\t\t" + util.TabOut(o)
}
return fmt.Sprintf("LeftJoinIterator: %d\n\t%s\n\t%s\n\t%s", j.definitionLevel, srequired, soptional, j.pred)
}

func (j *LeftJoinIterator) Next() (*IteratorResult, error) {

// Here is the algorithm for joins: On each pass of the iterators
Expand Down Expand Up @@ -926,6 +962,14 @@ func NewUnionIterator(definitionLevel int, iters []Iterator, pred GroupPredicate
return &j
}

func (u *UnionIterator) String() string {
var iters string
for _, iter := range u.iters {
iters += iter.String() + ", "
}
return fmt.Sprintf("UnionIterator(%s)", iters)
}

func (u *UnionIterator) Next() (*IteratorResult, error) {
// Here is the algorithm for unions: On each pass of the iterators
// we remember which ones are pointing at the earliest same row. The
Expand Down Expand Up @@ -1040,6 +1084,8 @@ func (u *UnionIterator) Close() {
}

type GroupPredicate interface {
fmt.Stringer

KeepGroup(*IteratorResult) bool
}

Expand Down Expand Up @@ -1068,6 +1114,18 @@ func NewKeyValueGroupPredicate(keys, values []string) *KeyValueGroupPredicate {
return p
}

func (a *KeyValueGroupPredicate) String() string {
var skeys []string
var svals []string
for _, k := range a.keys {
skeys = append(skeys, string(k))
}
for _, v := range a.vals {
svals = append(svals, string(v))
}
return fmt.Sprintf("KeyValueGroupPredicate{%v, %v}", skeys, svals)
}

// KeepGroup checks if the given group contains all of the requested
// key/value pairs.
func (a *KeyValueGroupPredicate) KeepGroup(group *IteratorResult) bool {
Expand Down
1 change: 1 addition & 0 deletions pkg/parquetquery/predicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func newAlwaysFalsePredicate() *mockPredicate {
return &mockPredicate{ret: false}
}

func (p *mockPredicate) String() string { return "mockPredicate{}" }
func (p *mockPredicate) KeepValue(parquet.Value) bool { p.valCalled = true; return p.ret }
func (p *mockPredicate) KeepPage(parquet.Page) bool { p.pageCalled = true; return p.ret }
func (p *mockPredicate) KeepColumnChunk(parquet.ColumnChunk) bool { p.chunkCalled = true; return p.ret }
Expand Down
54 changes: 54 additions & 0 deletions pkg/parquetquery/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package parquetquery

import (
"bytes"
"fmt"
"regexp"
"strings"

Expand All @@ -11,6 +12,8 @@ import (
// Predicate is a pushdown predicate that can be applied at
// the chunk, page, and value levels.
type Predicate interface {
fmt.Stringer

KeepColumnChunk(cc pq.ColumnChunk) bool
KeepPage(page pq.Page) bool
KeepValue(pq.Value) bool
Expand All @@ -36,6 +39,14 @@ func NewStringInPredicate(ss []string) Predicate {
return p
}

func (p *StringInPredicate) String() string {
var strings string
for _, s := range p.ss {
strings += fmt.Sprintf("%s, ", string(s))
}
return fmt.Sprintf("StringInPredicate{%s}", strings)
}

func (p *StringInPredicate) KeepColumnChunk(cc pq.ColumnChunk) bool {
p.helper.setNewRowGroup()

Expand Down Expand Up @@ -97,6 +108,14 @@ func NewRegexInPredicate(regs []string) (*RegexInPredicate, error) {
return p, nil
}

func (p *RegexInPredicate) String() string {
var strings string
for _, s := range p.regs {
strings += fmt.Sprintf("%s, ", s.String())
}
return fmt.Sprintf("RegexInPredicate{%s}", strings)
}

func (p *RegexInPredicate) keep(v *pq.Value) bool {
if v.IsNull() {
// Null
Expand Down Expand Up @@ -154,6 +173,10 @@ func NewSubstringPredicate(substring string) *SubstringPredicate {
}
}

func (p *SubstringPredicate) String() string {
return fmt.Sprintf("SubstringPredicate{%s}", p.substring)
}

func (p *SubstringPredicate) KeepColumnChunk(cc pq.ColumnChunk) bool {
p.helper.setNewRowGroup()

Expand Down Expand Up @@ -192,6 +215,10 @@ func NewIntBetweenPredicate(min, max int64) *IntBetweenPredicate {
return &IntBetweenPredicate{min, max}
}

func (p *IntBetweenPredicate) String() string {
return fmt.Sprintf("IntBetweenPredicate{%d,%d}", p.min, p.max)
}

func (p *IntBetweenPredicate) KeepColumnChunk(c pq.ColumnChunk) bool {

if ci := c.ColumnIndex(); ci != nil {
Expand Down Expand Up @@ -239,6 +266,10 @@ func NewGenericPredicate[T any](fn func(T) bool, rangeFn func(T, T) bool, extrac
return &GenericPredicate[T]{Fn: fn, RangeFn: rangeFn, Extract: extract}
}

func (p *GenericPredicate[T]) String() string {
return "GenericPredicate{}"
}

func (p *GenericPredicate[T]) KeepColumnChunk(c pq.ColumnChunk) bool {
p.helper.setNewRowGroup()

Expand Down Expand Up @@ -307,6 +338,10 @@ func NewFloatBetweenPredicate(min, max float64) *FloatBetweenPredicate {
return &FloatBetweenPredicate{min, max}
}

func (p *FloatBetweenPredicate) String() string {
return fmt.Sprintf("FloatBetweenPredicate{%f,%f}", p.min, p.max)
}

func (p *FloatBetweenPredicate) KeepColumnChunk(c pq.ColumnChunk) bool {

if ci := c.ColumnIndex(); ci != nil {
Expand Down Expand Up @@ -347,6 +382,14 @@ func NewOrPredicate(preds ...Predicate) *OrPredicate {
}
}

func (p *OrPredicate) String() string {
var preds string
for _, pred := range p.preds {
preds += pred.String() + ","
}
return fmt.Sprintf("OrPredicate{%s}", p.preds)
}

func (p *OrPredicate) KeepColumnChunk(c pq.ColumnChunk) bool {
ret := false
for _, p := range p.preds {
Expand Down Expand Up @@ -403,6 +446,13 @@ type InstrumentedPredicate struct {

var _ Predicate = (*InstrumentedPredicate)(nil)

func (p *InstrumentedPredicate) String() string {
if p.pred == nil {
return fmt.Sprintf("InstrumentedPredicate{%d, nil}", p.InspectedValues)
}
return fmt.Sprintf("InstrumentedPredicate{%d, %s}", p.InspectedValues, p.pred)
}

func (p *InstrumentedPredicate) KeepColumnChunk(c pq.ColumnChunk) bool {
p.InspectedColumnChunks++

Expand Down Expand Up @@ -489,6 +539,10 @@ func NewSkipNilsPredicate() *SkipNilsPredicate {
return &SkipNilsPredicate{}
}

func (p *SkipNilsPredicate) String() string {
return "SkipNilsPredicate{}"
}

func (p *SkipNilsPredicate) KeepColumnChunk(c pq.ColumnChunk) bool {
return true
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Element interface {
type pipelineElement interface {
Element
extractConditions(request *FetchSpansRequest)
evaluate([]Spanset) ([]Spanset, error)
evaluate([]*Spanset) ([]*Spanset, error)
}

type typedExpression interface {
Expand Down Expand Up @@ -85,7 +85,7 @@ func (p Pipeline) extractConditions(req *FetchSpansRequest) {
}
}

func (p Pipeline) evaluate(input []Spanset) (result []Spanset, err error) {
func (p Pipeline) evaluate(input []*Spanset) (result []*Spanset, err error) {
result = input

for _, element := range p.Elements {
Expand All @@ -95,7 +95,7 @@ func (p Pipeline) evaluate(input []Spanset) (result []Spanset, err error) {
}

if len(result) == 0 {
return []Spanset{}, nil
return []*Spanset{}, nil
}
}

Expand All @@ -116,7 +116,7 @@ func (o GroupOperation) extractConditions(request *FetchSpansRequest) {
o.Expression.extractConditions(request)
}

func (GroupOperation) evaluate(ss []Spanset) ([]Spanset, error) {
func (GroupOperation) evaluate(ss []*Spanset) ([]*Spanset, error) {
return ss, nil
}

Expand All @@ -130,7 +130,7 @@ func newCoalesceOperation() CoalesceOperation {
func (o CoalesceOperation) extractConditions(request *FetchSpansRequest) {
}

func (CoalesceOperation) evaluate(ss []Spanset) ([]Spanset, error) {
func (CoalesceOperation) evaluate(ss []*Spanset) ([]*Spanset, error) {
return ss, nil
}

Expand Down Expand Up @@ -257,8 +257,8 @@ func newSpansetFilter(e FieldExpression) SpansetFilter {
// nolint: revive
func (SpansetFilter) __spansetExpression() {}

func (f SpansetFilter) evaluate(input []Spanset) ([]Spanset, error) {
var output []Spanset
func (f SpansetFilter) evaluate(input []*Spanset) ([]*Spanset, error) {
var output []*Spanset

for _, ss := range input {
if len(ss.Spans) == 0 {
Expand Down Expand Up @@ -287,9 +287,9 @@ func (f SpansetFilter) evaluate(input []Spanset) ([]Spanset, error) {
continue
}

matchingSpanset := ss
matchingSpanset := *ss
matchingSpanset.Spans = matchingSpans
output = append(output, matchingSpanset)
output = append(output, &matchingSpanset)
}

return output, nil
Expand Down
15 changes: 8 additions & 7 deletions pkg/traceql/ast_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
"github.com/grafana/tempo/pkg/util/log"
)

func appendSpans(buffer []Span, input []Spanset) []Span {
func appendSpans(buffer []Span, input []*Spanset) []Span {
for _, i := range input {
buffer = append(buffer, i.Spans...)
}
return buffer
}

func (o SpansetOperation) evaluate(input []Spanset) (output []Spanset, err error) {
func (o SpansetOperation) evaluate(input []*Spanset) (output []*Spanset, err error) {

for i := range input {
curr := input[i : i+1]
Expand Down Expand Up @@ -57,7 +57,7 @@ func (o SpansetOperation) evaluate(input []Spanset) (output []Spanset, err error
return output, nil
}

func (f ScalarFilter) evaluate(input []Spanset) (output []Spanset, err error) {
func (f ScalarFilter) evaluate(input []*Spanset) (output []*Spanset, err error) {

// TODO we solve this gap where pipeline elements and scalar binary
// operations meet in a generic way. For now we only support well-defined
Expand Down Expand Up @@ -105,7 +105,7 @@ func (f SpansetFilter) matches(span Span) (bool, error) {
return static.B, nil
}

func (a Aggregate) evaluate(input []Spanset) (output []Spanset, err error) {
func (a Aggregate) evaluate(input []*Spanset) (output []*Spanset, err error) {

for _, ss := range input {
switch a.op {
Expand Down Expand Up @@ -265,18 +265,19 @@ func (s Static) execute(span Span) (Static, error) {
}

func (a Attribute) execute(span Span) (Static, error) {
static, ok := span.Attributes[a]
atts := span.Attributes()
static, ok := atts[a]
if ok {
return static, nil
}

if a.Scope == AttributeScopeNone {
for attribute, static := range span.Attributes {
for attribute, static := range atts {
if a.Name == attribute.Name && attribute.Scope == AttributeScopeSpan {
return static, nil
}
}
for attribute, static := range span.Attributes {
for attribute, static := range atts {
if a.Name == attribute.Name {
return static, nil
}
Expand Down
Loading