Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failing SearchTagValues endpoint after startup #1813

Merged
merged 6 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Internal types are updated to use `scope` instead of `instrumentation_library`.
* [BUGFIX] Correctly propagate errors from the iterator layer up through the queriers [#1723](https://github.com/grafana/tempo/pull/1723) (@joe-elliott)
* [BUGFIX] Make multitenancy work with HTTP [#1781](https://github.com/grafana/tempo/pull/1781) (@gouthamve)
* [BUGFIX] Fix parquet search bug fix on http.status_code that may cause incorrect results to be returned [#1799](https://github.com/grafana/tempo/pull/1799) (@mdisibio)
* [BUGFIX] Fix failing SearchTagValues endpoint after startup [#1813](https://github.com/grafana/tempo/pull/1813) (@stoewer)

## v1.5.0 / 2022-08-17

Expand Down
17 changes: 13 additions & 4 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,16 +652,25 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) ([]*localBlock, er
ib := newLocalBlock(ctx, b, i.local)
rediscoveredBlocks = append(rediscoveredBlocks, ib)

sb := search.OpenBackendSearchBlock(b.BlockMeta().BlockID, b.BlockMeta().TenantID, i.localReader)
level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime())

sb, err := search.OpenBackendSearchBlock(b.BlockMeta().BlockID, b.BlockMeta().TenantID, i.localReader)
if err != nil {
if errors.Is(err, search.ErrSearchNotSupported) {
continue
}
return nil, err
}

i.blocksMtx.Lock()
i.completeBlocks = append(i.completeBlocks, ib)
i.searchCompleteBlocks[ib] = &searchLocalBlockEntry{b: sb}
i.blocksMtx.Unlock()

level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime())
}

i.blocksMtx.Lock()
i.completeBlocks = append(i.completeBlocks, rediscoveredBlocks...)
i.blocksMtx.Unlock()

return rediscoveredBlocks, nil
}

Expand Down
7 changes: 4 additions & 3 deletions tempodb/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ type Writer interface {
WriteBlockMeta(ctx context.Context, meta *BlockMeta) error
// Append starts or continues an Append job. Pass nil to AppendTracker to start a job.
Append(ctx context.Context, name string, blockID uuid.UUID, tenantID string, tracker AppendTracker, buffer []byte) (AppendTracker, error)
// Closes any resources associated with the AppendTracker
// CloseAppend closes any resources associated with the AppendTracker
CloseAppend(ctx context.Context, tracker AppendTracker) error
// WriteTenantIndex writes the two meta slices as a tenant index
WriteTenantIndex(ctx context.Context, tenantID string, meta []*BlockMeta, compactedMeta []*CompactedBlockMeta) error
}

// Reader is a collection of methods to read data from tempodb backends
type Reader interface {
// Reader is for reading entire objects from the backend. There will be an attempt to retrieve this from cache if shouldCache is true.
// Read is for reading entire objects from the backend. There will be an attempt to retrieve this
// from cache if shouldCache is true.
Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) ([]byte, error)
// StreamReader is for streaming entire objects from the backend. It is expected this will _not_ be cached.
StreamReader(ctx context.Context, name string, blockID uuid.UUID, tenantID string) (io.ReadCloser, int64, error)
Expand All @@ -45,7 +46,7 @@ type Reader interface {
ReadRange(ctx context.Context, name string, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte, shouldCache bool) error
// Tenants returns a list of all tenants in a backend
Tenants(ctx context.Context) ([]string, error)
// Blocks returns returns a list of block UUIDs given a tenant
// Blocks returns a list of block UUIDs given a tenant
Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error)
// BlockMeta returns the blockmeta given a block and tenant id
BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error)
Expand Down
4 changes: 2 additions & 2 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type BlockMeta struct {
BlockID uuid.UUID `json:"blockID"` // Unique block id
MinID []byte `json:"minID"` // Minimum object id stored in this block
MaxID []byte `json:"maxID"` // Maximum object id stored in this block
TenantID string `json:"tenantID"` // ID of tehant to which this block belongs
StartTime time.Time `json:"startTime"` // Roughly matches when the first obj was written to this block. Used to determine block age for different purposes (cacheing, etc)
TenantID string `json:"tenantID"` // ID of tenant to which this block belongs
StartTime time.Time `json:"startTime"` // Roughly matches when the first obj was written to this block. Used to determine block age for different purposes (caching, etc)
EndTime time.Time `json:"endTime"` // Currently mostly meaningless but roughly matches to the time the last obj was written to this block
TotalObjects int `json:"totalObjects"` // Total objects in this block
Size uint64 `json:"size"` // Total size in bytes of the data object
Expand Down
1 change: 1 addition & 0 deletions tempodb/backend/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (m *MockReader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID

return m.M, nil
}

func (m *MockReader) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string, shouldCache bool) ([]byte, error) {
if m.ReadFn != nil {
return m.ReadFn(name, blockID, tenantID)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/backend/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type RawWriter interface {
Write(ctx context.Context, name string, keypath KeyPath, data io.Reader, size int64, shouldCache bool) error
// Append starts or continues an Append job. Pass nil to AppendTracker to start a job.
Append(ctx context.Context, name string, keypath KeyPath, tracker AppendTracker, buffer []byte) (AppendTracker, error)
// Closes any resources associated with the AppendTracker
// CloseAppend closes any resources associated with the AppendTracker.
CloseAppend(ctx context.Context, tracker AppendTracker) error
}

Expand Down
50 changes: 40 additions & 10 deletions tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,25 @@ package search

import (
"context"
"fmt"
"io"

"github.com/google/uuid"
"github.com/pkg/errors"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/tempodb/backend"
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
"github.com/pkg/errors"
)

const (
defaultBackendSearchBlockPageSize = 2 * 1024 * 1024
searchIndexName = "search-index"
searchHeaderName = "search-header"
)

const defaultBackendSearchBlockPageSize = 2 * 1024 * 1024
var (
ErrSearchNotSupported = fmt.Errorf("flatbuffer search not supported")
)

type BackendSearchBlock struct {
id uuid.UUID
Expand Down Expand Up @@ -98,14 +106,14 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, block
if err != nil {
return err
}
err = rw.Write(ctx, "search-index", blockID, tenantID, indexBytes, true)
err = rw.Write(ctx, searchIndexName, blockID, tenantID, indexBytes, true)
if err != nil {
return err
}

// Write header
hb := header.ToBytes()
err = rw.Write(ctx, "search-header", blockID, tenantID, hb, true)
err = rw.Write(ctx, searchHeaderName, blockID, tenantID, hb, true)
if err != nil {
return err
}
Expand All @@ -121,12 +129,22 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, rw backend.Writer, block
}

// OpenBackendSearchBlock opens the search data for an existing block in the given backend.
func OpenBackendSearchBlock(blockID uuid.UUID, tenantID string, r backend.Reader) *BackendSearchBlock {
return &BackendSearchBlock{
func OpenBackendSearchBlock(blockID uuid.UUID, tenantID string, r backend.Reader) (*BackendSearchBlock, error) {
b := &BackendSearchBlock{
id: blockID,
tenantID: tenantID,
r: r,
}

supported, err := b.isSearchSupported(context.TODO())
if err != nil {
return nil, errors.Wrap(err, "unable to open search block")
}
if !supported {
return nil, errors.Wrap(ErrSearchNotSupported, "unable to open search block")
}

return b, nil
}

// BlockID provides access to the private field id
Expand Down Expand Up @@ -190,7 +208,7 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results

// Read header
// Verify something in the block matches by checking the header
hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true)
hb, err := s.r.Read(ctx, searchHeaderName, s.id, s.tenantID, true)
if err != nil {
return err
}
Expand All @@ -208,7 +226,7 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results

// Read index
bmeta := backend.NewBlockMeta(s.tenantID, s.id, meta.Version, meta.Encoding, "")
cr := backend.NewContextReader(bmeta, "search-index", s.r, false)
cr := backend.NewContextReader(bmeta, searchIndexName, s.r, false)

ir, err := v2.NewIndexReader(cr, int(meta.IndexPageSize), int(meta.IndexRecords))
if err != nil {
Expand Down Expand Up @@ -279,9 +297,21 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results
}

func (s *BackendSearchBlock) readSearchHeader(ctx context.Context) (*tempofb.SearchBlockHeader, error) {
hb, err := s.r.Read(ctx, "search-header", s.id, s.tenantID, true)
hb, err := s.r.Read(ctx, searchHeaderName, s.id, s.tenantID, true)
if err != nil {
return nil, err
}
return tempofb.GetRootAsSearchBlockHeader(hb, 0), nil
}

func (s *BackendSearchBlock) isSearchSupported(ctx context.Context) (bool, error) {
buffer := make([]byte, 1)
err := s.r.ReadRange(ctx, searchHeaderName, s.id, s.tenantID, 0, buffer, false)
if err != nil {
if errors.Is(err, backend.ErrDoesNotExist) {
return false, nil
}
return false, err
}
return true, nil
}
25 changes: 23 additions & 2 deletions tempodb/search/backend_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"sync"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/local"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -53,7 +55,9 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E
err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, pageSizeBytes)
require.NoError(t, err)

b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l))
b2, err := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l))
require.NoError(t, err)

return b2
}

Expand Down Expand Up @@ -85,7 +89,8 @@ func TestBackendSearchBlockSearch(t *testing.T) {
err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, 0)
require.NoError(t, err)

b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l))
b2, err := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l))
require.NoError(t, err)

// Perform test suite

Expand Down Expand Up @@ -173,6 +178,22 @@ func TestBackendSearchBlockFinalSize(t *testing.T) {
}
}

func TestBackendSearchBlockSearchNotSupported(t *testing.T) {
blockID := uuid.New()
tmpDir := t.TempDir()
l, err := local.NewBackend(&local.Config{
Path: tmpDir,
})
require.NoError(t, err)

err = os.MkdirAll(filepath.Join(tmpDir, testTenantID, blockID.String()), 0755)
require.NoError(t, err)

_, err = OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l))
require.Error(t, err)
assert.ErrorIs(t, err, ErrSearchNotSupported)
}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
pageSizesMB := []float32{0.5, 1, 2}

Expand Down
3 changes: 1 addition & 2 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ func (rw *readerWriter) CompleteSearchBlockWithBackend(block *search.StreamingSe
return nil, err
}

b := search.OpenBackendSearchBlock(blockID, tenantID, r)
return b, nil
return search.OpenBackendSearchBlock(blockID, tenantID, r)
}

func (rw *readerWriter) WAL() *wal.WAL {
Expand Down