Skip to content

Commit

Permalink
Move invariant violation check to index/block.go
Browse files Browse the repository at this point in the history
  • Loading branch information
wesleyk committed Feb 9, 2021
1 parent 46a2387 commit d2f9178
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 26 deletions.
8 changes: 6 additions & 2 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,11 @@ func (b *block) queryWithSpan(
func (b *block) closeAsync(closer io.Closer) {
if err := closer.Close(); err != nil {
// Note: This only happens if closing the readers isn't clean.
b.logger.Error("could not close query index block resource", zap.Error(err))
instrument.EmitAndLogInvariantViolation(
b.iopts,
func(l *zap.Logger) {
l.Error("could not close query index block resource", zap.Error(err))
})
}
}

Expand Down Expand Up @@ -644,7 +648,7 @@ func (b *block) aggregateWithSpan(
if len(aggOpts.FieldFilter) == 0 {
return r.FieldsPostingsList()
}
return newFilterFieldsIterator(b.iopts, r, aggOpts.FieldFilter)
return newFilterFieldsIterator(r, aggOpts.FieldFilter)
},
}

Expand Down
16 changes: 1 addition & 15 deletions src/dbnode/storage/index/filter_fields_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,15 @@ import (
"bytes"
"errors"

"go.uber.org/zap"

"github.com/m3db/m3/src/m3ninx/index/segment"
"github.com/m3db/m3/src/m3ninx/postings"
"github.com/m3db/m3/src/x/instrument"
)

var (
errNoFiltersSpecified = errors.New("no fields specified to filter upon")
)

func newFilterFieldsIterator(
iOpts instrument.Options,
reader segment.Reader,
fields AggregateFieldFilter,
) (segment.FieldsPostingsListIterator, error) {
Expand All @@ -48,7 +44,6 @@ func newFilterFieldsIterator(
return nil, err
}
return &filterFieldsIterator{
iOpts: iOpts,
reader: reader,
fieldsIter: fieldsIter,
fields: fields,
Expand All @@ -57,7 +52,6 @@ func newFilterFieldsIterator(
}

type filterFieldsIterator struct {
iOpts instrument.Options
reader segment.Reader
fieldsIter segment.FieldsPostingsListIterator
fields AggregateFieldFilter
Expand Down Expand Up @@ -100,13 +94,5 @@ func (f *filterFieldsIterator) Err() error {
}

func (f *filterFieldsIterator) Close() error {
if err := f.fieldsIter.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(
f.iOpts,
func(l *zap.Logger) {
l.Error("error when closing filterFieldsIterator", zap.Error(err))
})
return err
}
return nil
return f.fieldsIter.Close()
}
16 changes: 7 additions & 9 deletions src/dbnode/storage/index/filter_fields_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ package index
import (
"testing"

"github.com/m3db/m3/src/x/instrument"

"github.com/m3db/m3/src/m3ninx/index/segment"
xtest "github.com/m3db/m3/src/x/test"

Expand All @@ -36,7 +34,7 @@ func TestNewFilterFieldsIteratorError(t *testing.T) {
defer ctrl.Finish()

r := segment.NewMockReader(ctrl)
_, err := newFilterFieldsIterator(instrument.NewTestOptions(t), r, nil)
_, err := newFilterFieldsIterator(r, nil)
require.Error(t, err)
}

Expand All @@ -47,7 +45,7 @@ func TestNewFilterFieldsIteratorNoMatchesInSegment(t *testing.T) {
filters := AggregateFieldFilter{[]byte("a"), []byte("b")}
reader := newMockSegmentReader(ctrl, map[string]terms{})

iter, err := newFilterFieldsIterator(instrument.NewTestOptions(t), reader, filters)
iter, err := newFilterFieldsIterator(reader, filters)
require.NoError(t, err)

require.False(t, iter.Next())
Expand All @@ -62,7 +60,7 @@ func TestNewFilterFieldsIteratorFirstMatch(t *testing.T) {
filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")}
reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}})

iter, err := newFilterFieldsIterator(instrument.NewTestOptions(t), reader, filters)
iter, err := newFilterFieldsIterator(reader, filters)
require.NoError(t, err)

require.True(t, iter.Next())
Expand All @@ -80,7 +78,7 @@ func TestNewFilterFieldsIteratorMiddleMatch(t *testing.T) {
filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")}
reader := newMockSegmentReader(ctrl, map[string]terms{"d": {}, "b": {}, "e": {}})

iter, err := newFilterFieldsIterator(instrument.NewTestOptions(t), reader, filters)
iter, err := newFilterFieldsIterator(reader, filters)
require.NoError(t, err)

require.True(t, iter.Next())
Expand All @@ -98,7 +96,7 @@ func TestNewFilterFieldsIteratorEndMatch(t *testing.T) {
filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")}
reader := newMockSegmentReader(ctrl, map[string]terms{"d": {}, "e": {}, "c": {}})

iter, err := newFilterFieldsIterator(instrument.NewTestOptions(t), reader, filters)
iter, err := newFilterFieldsIterator(reader, filters)
require.NoError(t, err)

require.True(t, iter.Next())
Expand All @@ -116,7 +114,7 @@ func TestNewFilterFieldsIteratorAllMatch(t *testing.T) {
filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")}
reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}, "b": {}, "c": {}})

iter, err := newFilterFieldsIterator(instrument.NewTestOptions(t), reader, filters)
iter, err := newFilterFieldsIterator(reader, filters)
require.NoError(t, err)

require.True(t, iter.Next())
Expand All @@ -143,7 +141,7 @@ func TestNewFilterFieldsIteratorRandomMatch(t *testing.T) {
filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")}
reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}, "c": {}})

iter, err := newFilterFieldsIterator(instrument.NewTestOptions(t), reader, filters)
iter, err := newFilterFieldsIterator(reader, filters)
require.NoError(t, err)

require.True(t, iter.Next())
Expand Down

0 comments on commit d2f9178

Please sign in to comment.