Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add string interning to SyncIterator #3411

Merged
merged 14 commits into from
Feb 27, 2024
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## main / unreleased

* [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno)

## v2.4.0

* [CHANGE] Merge the processors overrides set through runtime overrides and user-configurable overrides [#3125](https://github.com/grafana/tempo/pull/3125) (@kvrhdn)
Expand Down
78 changes: 78 additions & 0 deletions pkg/parquetquery/intern/intern.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Package intern is a utility for interning byte slices for pq.Value's.
// It is not safe for concurrent use.
//
// The Interner is used to intern byte slices for pq.Value's. This is useful
// for reducing memory usage and improving performance when working with
// large datasets with many repeated strings.
package intern

import (
"unsafe"

pq "github.com/parquet-go/parquet-go"
)

type Interner struct {
m map[string][]byte // TODO(mapno): Use swiss.Map (https://github.com/cockroachdb/swiss)
}

func New() *Interner {
return NewWithSize(0)
}

func NewWithSize(size int) *Interner {
return &Interner{m: make(map[string][]byte, size)}
}

func (i *Interner) UnsafeClone(v *pq.Value) pq.Value {
switch v.Kind() {
case pq.ByteArray, pq.FixedLenByteArray:
// Look away, this is unsafe.
a := *(*pqValue)(unsafe.Pointer(v))
a.ptr = addressOfBytes(i.internBytes(a.byteArray()))
return *(*pq.Value)(unsafe.Pointer(&a))
default:
return *v
}
}

func (i *Interner) internBytes(b []byte) []byte {
if x, ok := i.m[bytesToString(b)]; ok {
return x
}

clone := make([]byte, len(b))
copy(clone, b)
i.m[bytesToString(clone)] = clone
return clone
}

func (i *Interner) Close() {
clear(i.m) // clear the map
i.m = nil
}

// bytesToString converts a byte slice to a string.
// String shares the memory with the byte slice.
// The byte slice should not be modified after call.
func bytesToString(b []byte) string { return unsafe.String(unsafe.SliceData(b), len(b)) }

// addressOfBytes returns the address of the first byte in data.
// The data should not be modified after call.
func addressOfBytes(data []byte) *byte { return unsafe.SliceData(data) }

// bytes converts a pointer to a slice of bytes
func bytes(data *byte, size int) []byte { return unsafe.Slice(data, size) }

// pqValue is a slimmer version of github.com/parquet-go/parquet-go's pq.Value.
type pqValue struct {
// data
ptr *byte
u64 uint64
// type
kind int8 // XOR(Kind) so the zero-value is <null>
}

func (v *pqValue) byteArray() []byte {
return bytes(v.ptr, int(v.u64))
}
114 changes: 114 additions & 0 deletions pkg/parquetquery/intern/intern_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package intern

import (
"fmt"
"testing"
"unsafe"

pq "github.com/parquet-go/parquet-go"
)

func TestInterner_internBytes(t *testing.T) {
i := New()
defer i.Close()

words := []string{"hello", "world", "hello", "world", "hello", "world"}
for _, w := range words {
_ = i.internBytes([]byte(w))
}
if len(i.m) != 2 {
// Values are interned, so there should be only 2 unique words
t.Errorf("expected 2, got %d", len(i.m))
}
interned1, interned2 := i.internBytes([]byte("hello")), i.internBytes([]byte("hello"))
if interned1[0] != interned2[0] {
// Values are interned, so the memory address should be the same
t.Error("expected same memory address")
}
}

func TestInterner_UnsafeClone(t *testing.T) {
i := New()
defer i.Close()

value1 := pq.ByteArrayValue([]byte("foo"))
value2 := pq.ByteArrayValue([]byte("foo"))

clone1 := i.UnsafeClone(&value1)
clone2 := i.UnsafeClone(&value2)

if clone1.ByteArray()[0] != clone2.ByteArray()[0] {
// Values are interned, so the memory address should be the same
t.Error("expected same memory address")
}

if value1.ByteArray()[0] != value2.ByteArray()[0] {
// Mutates the original value, so the memory address should be different as well
t.Error("expected same memory address")
}
}

func Test_pqValue(t *testing.T) {
// Test that conversion from pq.Value to pqValue and back to pq.Value
// does not change the value.
value := pq.ByteArrayValue([]byte("foo"))
pqValue := *(*pqValue)(unsafe.Pointer(&value))
back := *(*pq.Value)(unsafe.Pointer(&pqValue))

if value.Kind() != back.Kind() {
t.Error("expected same kind")
}

if string(value.ByteArray()) != string(back.ByteArray()) {
t.Error("expected same value")
}

if value.String() != back.String() {
t.Error("expected same value")
}
}

func BenchmarkIntern(b *testing.B) {
words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"}
testCases := []struct {
name string
valueFn func(i int) pq.Value
}{
{
name: "byte_array",
valueFn: func(i int) pq.Value { return pq.ByteArrayValue([]byte(words[i%len(words)])) },
},
{
name: "fixed_len_byte_array",
valueFn: func(i int) pq.Value { return pq.FixedLenByteArrayValue([]byte(words[i%len(words)])) },
},
{
name: "bool",
valueFn: func(i int) pq.Value { return pq.BooleanValue(i%2 == 0) },
},
{
name: "int32",
valueFn: func(i int) pq.Value { return pq.Int32Value(int32(i)) },
},
}

for _, tc := range testCases {
b.Run(fmt.Sprintf("no_interning: %s", tc.name), func(b *testing.B) {
for i := 0; i < b.N; i++ {
value := tc.valueFn(i)
_ = value.Clone()
}
})

b.Run(fmt.Sprintf("interning: %s", tc.name), func(b *testing.B) {
interner := New()
defer interner.Close()

b.ResetTimer()
for i := 0; i < b.N; i++ {
value := tc.valueFn(i)
_ = interner.UnsafeClone(&value)
}
})
}
}
38 changes: 35 additions & 3 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"sync/atomic"

"github.com/grafana/tempo/pkg/parquetquery/intern"
"github.com/grafana/tempo/pkg/util"
"github.com/opentracing/opentracing-go"
pq "github.com/parquet-go/parquet-go"
Expand Down Expand Up @@ -498,6 +499,18 @@ func ReleaseResult(r *IteratorResult) {
}
}

type SyncIteratorOpt func(*SyncIterator)

// SyncIteratorOptIntern enables interning of string values.
// This is useful when the same string value is repeated many times.
// Not recommended with (very) high cardinality columns, such as UUIDs (spanID and traceID).
func SyncIteratorOptIntern() SyncIteratorOpt {
return func(i *SyncIterator) {
i.intern = true
i.interner = intern.New()
}
}

// SyncIterator is like ColumnIterator but synchronous. It scans through the given row
// groups and column, and applies the optional predicate to each chunk, page, and value.
// Results are read by calling Next() until it returns nil.
Expand Down Expand Up @@ -526,11 +539,14 @@ type SyncIterator struct {
currBuf []pq.Value
currBufN int
currPageN int

intern bool
interner *intern.Interner
}

var _ Iterator = (*SyncIterator)(nil)

func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *SyncIterator {
func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string, opts ...SyncIteratorOpt) *SyncIterator {
// Assign row group bounds.
// Lower bound is inclusive
// Upper bound is exclusive, points at the first row of the next group
Expand All @@ -549,7 +565,8 @@ func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnN
"column": columnName,
})

return &SyncIterator{
// Create the iterator
i := &SyncIterator{
span: span,
column: column,
columnName: columnName,
Expand All @@ -561,6 +578,13 @@ func NewSyncIterator(ctx context.Context, rgs []pq.RowGroup, column int, columnN
filter: &InstrumentedPredicate{pred: filter},
curr: EmptyRowNumber(),
}

// Apply options
for _, opt := range opts {
opt(i)
}

return i
}

func (c *SyncIterator) String() string {
Expand Down Expand Up @@ -933,7 +957,11 @@ func (c *SyncIterator) makeResult(t RowNumber, v *pq.Value) *IteratorResult {
r := GetResult()
r.RowNumber = t
if c.selectAs != "" {
r.AppendValue(c.selectAs, v.Clone())
if c.intern {
r.AppendValue(c.selectAs, c.interner.UnsafeClone(v))
} else {
r.AppendValue(c.selectAs, v.Clone())
}
}
return r
}
Expand All @@ -948,6 +976,10 @@ func (c *SyncIterator) Close() {
c.span.SetTag("keptPages", c.filter.KeptPages)
c.span.SetTag("keptValues", c.filter.KeptValues)
c.span.Finish()

if c.intern && c.interner != nil {
c.interner.Close()
}
}

// ColumnIterator asynchronously iterates through the given row groups and column. Applies
Expand Down
6 changes: 5 additions & 1 deletion tempodb/encoding/vparquet/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,11 @@ func makeIterFunc(ctx context.Context, rgs []parquet.RowGroup, pf *parquet.File)
return pq.NewColumnIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
}

return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
var opts []pq.SyncIteratorOpt
if name != columnPathSpanID && name != columnPathTraceID {
opts = append(opts, pq.SyncIteratorOptIntern())
}
return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs, opts...)
}
}

Expand Down
7 changes: 6 additions & 1 deletion tempodb/encoding/vparquet2/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,12 @@ func makeIterFunc(ctx context.Context, rgs []parquet.RowGroup, pf *parquet.File)
return pq.NewColumnIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
}

return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
var opts []pq.SyncIteratorOpt
if name != columnPathSpanID && name != columnPathTraceID {
opts = append(opts, pq.SyncIteratorOptIntern())
}

return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs, opts...)
}
}

Expand Down
7 changes: 6 additions & 1 deletion tempodb/encoding/vparquet3/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,12 @@ func makeIterFunc(ctx context.Context, rgs []parquet.RowGroup, pf *parquet.File)
return pq.NewColumnIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
}

return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs)
var opts []pq.SyncIteratorOpt
if name != columnPathSpanID && name != columnPathTraceID {
opts = append(opts, pq.SyncIteratorOptIntern())
}

return pq.NewSyncIterator(ctx, rgs, index, name, 1000, predicate, selectAs, opts...)
}
}

Expand Down
15 changes: 10 additions & 5 deletions tempodb/encoding/vparquet3/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,13 @@ func BenchmarkBackendBlockTraceQL(b *testing.B) {
ctx := context.TODO()
tenantID := "1"
// blockID := uuid.MustParse("00000c2f-8133-4a60-a62a-7748bd146938")
blockID := uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
// blockID := uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
blockID := uuid.MustParse("0008e57d-069d-4510-a001-b9433b2da08c")

r, _, _, err := local.New(&local.Config{
// Path: path.Join("/home/joe/testblock/"),
Path: path.Join("/Users/marty/src/tmp"),
// Path: path.Join("/Users/marty/src/tmp"),
Path: path.Join("/Users/mapno/workspace/testblock"),
mapno marked this conversation as resolved.
Show resolved Hide resolved
})
require.NoError(b, err)

Expand Down Expand Up @@ -688,7 +690,8 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {
"{} | rate()",
"{} | rate() by (name)",
"{} | rate() by (resource.service.name)",
"{resource.service.name=`tempo-gateway`} | rate()",
"{} | rate() by (span.http.url)", // High cardinality attribute
"{resource.service.name=`loki-ingester`} | rate()",
mapno marked this conversation as resolved.
Show resolved Hide resolved
"{status=error} | rate()",
}

Expand All @@ -697,9 +700,11 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) {
e = traceql.NewEngine()
opts = common.DefaultSearchOptions()
tenantID = "1"
blockID = uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
// blockID = uuid.MustParse("06ebd383-8d4e-4289-b0e9-cf2197d611d5")
blockID = uuid.MustParse("0008e57d-069d-4510-a001-b9433b2da08c")
// blockID = uuid.MustParse("18364616-f80d-45a6-b2a3-cb63e203edff")
path = "/Users/marty/src/tmp/"
// path = "/Users/marty/src/tmp/"
path = "/Users/mapno/workspace/testblock"
)

r, _, _, err := local.New(&local.Config{
Expand Down
Loading