Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Panic with different composite-indexed child objects #2947

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 59 additions & 86 deletions internal/db/fetcher/indexer_iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,41 @@ type indexIterResult struct {
value []byte
}

type queryResultIterator struct {
resultIter query.Results
// indexPrefixIterator is an iterator over index keys with a specific prefix.
type indexPrefixIterator struct {
indexDesc client.IndexDescription
indexedFields []client.FieldDefinition
indexKey core.IndexDataStoreKey
matchers []valueMatcher
execInfo *ExecInfo
resultIter query.Results
ctx context.Context
store datastore.DSReaderWriter
}

var _ indexIterator = (*indexPrefixIterator)(nil)

func (iter *indexPrefixIterator) Init(ctx context.Context, store datastore.DSReaderWriter) error {
iter.ctx = ctx
iter.store = store
iter.resultIter = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: If iter.resultIter is not nil here, it looks like it should probably be closed before clearing the ref?

I have a vague memory of the normal fetcher failing because of this some time ago, but I might be wrong.

return nil
}

func (iter *queryResultIterator) Next() (indexIterResult, error) {
func (iter *indexPrefixIterator) checkResultIterator() error {
if iter.resultIter == nil {
resultIter, err := iter.store.Query(iter.ctx, query.Query{
Prefix: iter.indexKey.ToString(),
})
if err != nil {
return err
}
iter.resultIter = resultIter
}
return nil
}

func (iter *indexPrefixIterator) nextResult() (indexIterResult, error) {
res, hasVal := iter.resultIter.NextSync()
if res.Error != nil {
return indexIterResult{}, res.Error
Expand All @@ -86,46 +114,31 @@ func (iter *queryResultIterator) Next() (indexIterResult, error) {
return indexIterResult{key: key, value: res.Value, foundKey: true}, nil
}

func (iter *queryResultIterator) Close() error {
return iter.resultIter.Close()
}

type eqPrefixIndexIterator struct {
queryResultIterator
indexKey core.IndexDataStoreKey
execInfo *ExecInfo
matchers []valueMatcher
}

func (iter *eqPrefixIndexIterator) Init(ctx context.Context, store datastore.DSReaderWriter) error {
resultIter, err := store.Query(ctx, query.Query{
Prefix: iter.indexKey.ToString(),
})
if err != nil {
return err
func (iter *indexPrefixIterator) Next() (indexIterResult, error) {
if err := iter.checkResultIterator(); err != nil {
return indexIterResult{}, err
}
iter.resultIter = resultIter
return nil
}

func (iter *eqPrefixIndexIterator) Next() (indexIterResult, error) {
for {
res, err := iter.queryResultIterator.Next()
res, err := iter.nextResult()
if err != nil || !res.foundKey {
return res, err
}
iter.execInfo.IndexesFetched++
doesMatch, err := executeValueMatchers(iter.matchers, res.key.Fields)
didMatch, err := executeValueMatchers(iter.matchers, res.key.Fields)
if err != nil {
return indexIterResult{}, err
}
if !doesMatch {
continue
if didMatch {
return res, nil
}
return res, err
}
}

func (iter *indexPrefixIterator) Close() error {
return iter.resultIter.Close()
}

type eqSingleIndexIterator struct {
indexKey core.IndexDataStoreKey
execInfo *ExecInfo
Expand Down Expand Up @@ -182,7 +195,7 @@ func (iter *inIndexIterator) nextIterator() (bool, error) {
}

switch fieldIter := iter.indexIterator.(type) {
case *eqPrefixIndexIterator:
case *indexPrefixIterator:
fieldIter.indexKey.Fields[0].Value = iter.inValues[iter.nextValIndex]
case *eqSingleIndexIterator:
fieldIter.indexKey.Fields[0].Value = iter.inValues[iter.nextValIndex]
Expand Down Expand Up @@ -238,41 +251,6 @@ func executeValueMatchers(matchers []valueMatcher, fields []core.IndexedField) (
return true, nil
}

type scanningIndexIterator struct {
queryResultIterator
indexKey core.IndexDataStoreKey
matchers []valueMatcher
execInfo *ExecInfo
}

func (iter *scanningIndexIterator) Init(ctx context.Context, store datastore.DSReaderWriter) error {
resultIter, err := store.Query(ctx, query.Query{
Prefix: iter.indexKey.ToString(),
})
if err != nil {
return err
}
iter.resultIter = resultIter

return nil
}

func (iter *scanningIndexIterator) Next() (indexIterResult, error) {
for {
res, err := iter.queryResultIterator.Next()
if err != nil || !res.foundKey {
return indexIterResult{}, err
}
iter.execInfo.IndexesFetched++

didMatch, err := executeValueMatchers(iter.matchers, res.key.Fields)

if didMatch {
return res, err
}
}
}

// checks if the value satisfies the condition
type valueMatcher interface {
Match(client.NormalValue) (bool, error)
Expand Down Expand Up @@ -468,7 +446,7 @@ func (m *anyMatcher) Match(client.NormalValue) (bool, error) { return true, nil
func (f *IndexFetcher) newPrefixIndexIterator(
fieldConditions []fieldFilterCond,
matchers []valueMatcher,
) (*eqPrefixIndexIterator, error) {
) (*indexPrefixIterator, error) {
keyFieldValues := make([]client.NormalValue, 0, len(fieldConditions))
for i := range fieldConditions {
if fieldConditions[i].op != opEq {
Expand All @@ -488,16 +466,21 @@ func (f *IndexFetcher) newPrefixIndexIterator(

key := f.newIndexDataStoreKeyWithValues(keyFieldValues)

return &eqPrefixIndexIterator{
queryResultIterator: f.newQueryResultIterator(),
indexKey: key,
execInfo: &f.execInfo,
matchers: matchers,
}, nil
return f.newQueryResultIterator(key, matchers, &f.execInfo), nil
}

func (f *IndexFetcher) newQueryResultIterator() queryResultIterator {
return queryResultIterator{indexDesc: f.indexDesc, indexedFields: f.indexedFields}
func (f *IndexFetcher) newQueryResultIterator(
indexKey core.IndexDataStoreKey,
matchers []valueMatcher,
execInfo *ExecInfo,
) *indexPrefixIterator {
return &indexPrefixIterator{
indexDesc: f.indexDesc,
indexedFields: f.indexedFields,
indexKey: indexKey,
matchers: matchers,
execInfo: execInfo,
}
}

// newInIndexIterator creates a new inIndexIterator for fetching indexed data.
Expand Down Expand Up @@ -537,12 +520,7 @@ func (f *IndexFetcher) newInIndexIterator(
indexKey := f.newIndexDataStoreKey()
indexKey.Fields = []core.IndexedField{{Descending: f.indexDesc.Fields[0].Descending}}

iter = &eqPrefixIndexIterator{
queryResultIterator: f.newQueryResultIterator(),
indexKey: indexKey,
execInfo: &f.execInfo,
matchers: matchers,
}
iter = f.newQueryResultIterator(indexKey, matchers, &f.execInfo)
}
return &inIndexIterator{
indexIterator: iter,
Expand Down Expand Up @@ -600,12 +578,7 @@ func (f *IndexFetcher) createIndexIterator() (indexIterator, error) {
case opIn:
return f.newInIndexIterator(fieldConditions, matchers)
case opGt, opGe, opLt, opLe, opNe, opNin, opLike, opNlike, opILike, opNILike:
return &scanningIndexIterator{
queryResultIterator: f.newQueryResultIterator(),
indexKey: f.newIndexDataStoreKey(),
matchers: matchers,
execInfo: &f.execInfo,
}, nil
return f.newQueryResultIterator(f.newIndexDataStoreKey(), matchers, &f.execInfo), nil
}

return nil, NewErrInvalidFilterOperator(fieldConditions[0].op)
Expand Down
12 changes: 6 additions & 6 deletions internal/planner/multi.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Democratized Data Foundation
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
Expand Down Expand Up @@ -214,22 +214,22 @@ func (s *selectNode) addSubPlan(fieldIndex int, newPlan planNode) error {
if err := s.planner.walkAndReplacePlan(s.source, origScan, multiscan); err != nil {
return err
}
// create multinode
multinode := &parallelNode{
// create parallelNode
parallelNode := &parallelNode{
p: s.planner,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: I forgot to add to my earlier review, thanks for renaming this.

multiscan: multiscan,
docMapper: docMapper{s.source.DocumentMap()},
}
multinode.addChild(-1, s.source)
parallelNode.addChild(-1, s.source)
multiscan.addReader()
// replace our new node internal scanNode with our new multiscanner
if err := s.planner.walkAndReplacePlan(newPlan, origScan, multiscan); err != nil {
return err
}
// add our newly updated plan to the multinode
multinode.addChild(fieldIndex, newPlan)
parallelNode.addChild(fieldIndex, newPlan)
multiscan.addReader()
s.source = multinode
s.source = parallelNode

// we already have an existing parallelNode as our source
case *parallelNode:
Expand Down
69 changes: 52 additions & 17 deletions internal/planner/scan.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 Democratized Data Foundation
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
Expand Down Expand Up @@ -322,38 +322,70 @@ func (p *Planner) Scan(
// If we have two readers on our multiScanNode, then
// we call Next() on the underlying scanNode only
// once every 2 Next() calls on the multiScan
//
// NOTE: calling Init() on multiScanNode is subject to counting as well and as such
// doesn't not provide idempotency guarantees. Counting is purely for performance
// reasons and removing it should be safe.
type multiScanNode struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: This is very clear, thanks Islam :)

scanNode *scanNode
numReaders int
numCalls int
nextCount int
initCount int
startCount int
closeCount int

lastBool bool
lastErr error
nextResult bool
err error
}

// Init initializes the multiScanNode.
// NOTE: this function is subject to counting based on the number of readers and as such
// doesn't not provide idempotency guarantees. Counting is purely for performance
// reasons and removing it should be safe.
func (n *multiScanNode) Init() error {
return n.scanNode.Init()
n.countAndCall(&n.initCount, func() error {
return n.scanNode.Init()
})
return n.err
}

func (n *multiScanNode) Start() error {
return n.scanNode.Start()
n.countAndCall(&n.startCount, func() error {
return n.scanNode.Start()
})
return n.err
}

// Next only calls Next() on the underlying
// scanNode every numReaders.
func (n *multiScanNode) Next() (bool, error) {
if n.numCalls == 0 {
n.lastBool, n.lastErr = n.scanNode.Next()
// countAndCall keeps track of number of requests to call a given function by checking a
// function's count.
// The function is only called when the count is 0.
// If the count is equal to the number of readers, the count is reset.
// If the function returns an error, the error is stored in the multiScanNode.
func (n *multiScanNode) countAndCall(count *int, f func() error) {
if *count == 0 {
err := f()
if err != nil {
n.err = err
}
}
n.numCalls++
*count++

// if the number of calls equals the numbers of readers
// reset the counter, so our next call actually executes the Next()
if n.numCalls == n.numReaders {
n.numCalls = 0
// reset the counter, so our next call actually executes the function
if *count == n.numReaders {
*count = 0
}
}

// Next only calls Next() on the underlying
// scanNode every numReaders.
func (n *multiScanNode) Next() (bool, error) {
n.countAndCall(&n.nextCount, func() (err error) {
n.nextResult, err = n.scanNode.Next()
return
})

return n.lastBool, n.lastErr
return n.nextResult, n.err
}

func (n *multiScanNode) Value() core.Doc {
Expand All @@ -373,7 +405,10 @@ func (n *multiScanNode) Kind() string {
}

func (n *multiScanNode) Close() error {
return n.scanNode.Close()
n.countAndCall(&n.closeCount, func() error {
return n.scanNode.Close()
})
return n.err
}

func (n *multiScanNode) DocumentMap() *core.DocumentMapping {
Expand Down
Loading
Loading