Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if segments require background GC before compacting #3695

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/dbnode/storage/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,16 @@ func (entry *Entry) IfAlreadyIndexedMarkIndexSuccessAndFinalize(
// TryMarkIndexGarbageCollected checks if the entry is eligible to be garbage collected
// from the index. If so, it marks the entry as GCed and returns true. Otherwise returns false.
func (entry *Entry) TryMarkIndexGarbageCollected() bool {
return entry.checkNeedsIndexGarbageCollected(true)
}

// NeedsIndexGarbageCollected checks if the entry is eligible to be garbage collected
// from the index. If so, it marks the entry as GCed and returns true. Otherwise returns false.
func (entry *Entry) NeedsIndexGarbageCollected() bool {
return entry.checkNeedsIndexGarbageCollected(false)
}

func (entry *Entry) checkNeedsIndexGarbageCollected(mark bool) bool {
// Since series insertions + index insertions are done separately async, it is possible for
// a series to be in the index but not have data written yet, and so any series not in the
// lookup yet we cannot yet consider empty.
Expand All @@ -255,8 +265,10 @@ func (entry *Entry) TryMarkIndexGarbageCollected() bool {
return false
}

// Mark as GCed from index so the entry can be safely cleaned up elsewhere.
entry.IndexGarbageCollected.Store(true)
if mark {
// Mark as GCed from index so the entry can be safely cleaned up elsewhere.
entry.IndexGarbageCollected.Store(true)
}

return true
}
Expand Down
84 changes: 71 additions & 13 deletions src/dbnode/storage/index/mutable_segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package index
import (
"errors"
"fmt"
"github.com/m3db/m3/src/m3ninx/x"
"math"
"runtime"
"sync"
Expand Down Expand Up @@ -82,6 +83,8 @@ type mutableSegments struct {
optsListener xresource.SimpleCloser
writeIndexingConcurrency int

seriesActiveFn segment.DocumentsFilter

metrics mutableSegmentsMetrics
logger *zap.Logger
}
Expand Down Expand Up @@ -135,6 +138,7 @@ func newMutableSegments(
metrics: newMutableSegmentsMetrics(iopts.MetricsScope()),
logger: iopts.Logger(),
}
m.seriesActiveFn = segment.DocumentsFilterFn(m.seriesActive)
m.optsListener = namespaceRuntimeOptsMgr.RegisterListener(m)
return m
}
Expand All @@ -161,6 +165,19 @@ func (m *mutableSegments) SetNamespaceRuntimeOptions(opts namespace.RuntimeOptio
builder.SetSortConcurrency(m.writeIndexingConcurrency)
}

func (m *mutableSegments) seriesActive(d doc.Metadata) bool {
// Filter out any documents that only were indexed for
// sealed blocks.
if d.OnIndexSeries == nil {
instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) {
l.Error("unexpected nil for document index entry for background compact")
})
return true
}

return !d.OnIndexSeries.TryMarkIndexGarbageCollected()
}

func (m *mutableSegments) WriteBatch(inserts *WriteBatch) (MutableSegmentsStats, error) {
m.Lock()
if m.state == mutableSegmentsStateClosed {
Expand Down Expand Up @@ -397,7 +414,6 @@ func (m *mutableSegments) backgroundCompactWithLock() {
gcPlan = &compaction.Plan{}
gcAlreadyRunning = m.compact.compactingBackgroundGarbageCollect
)

if !gcAlreadyRunning {
gcRequired = true

Expand All @@ -416,6 +432,20 @@ func (m *mutableSegments) backgroundCompactWithLock() {
continue
}

// Ensure that segment has some series that need to be GC'd.
hasAnyInactiveSeries, err := m.segmentAnyInactiveSeries(seg.Segment())
if err != nil {
instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) {
l.Error("error detecting needs background gc segment", zap.Error(err))
})
continue
}
if !hasAnyInactiveSeries {
// Skip background GC since all series are still active and no
// series need to be removed.
continue
}

// The active block starts are outdated, need to compact
// and remove any old data from the segment.
var task compaction.Task
Expand Down Expand Up @@ -477,6 +507,45 @@ func (m *mutableSegments) backgroundCompactWithLock() {
}
}

func (m *mutableSegments) segmentAnyInactiveSeries(seg segment.Segment) (bool, error) {
reader, err := seg.Reader()
if err != nil {
return false, err
}

defer reader.Close()

docs, err := reader.AllDocs()
if err != nil {
return false, err
}

docsCloser := x.NewSafeCloser(docs)
defer func() {
// In case of early return cleanup
_ = docsCloser.Close()
}()

var result bool
for docs.Next() {
d := docs.Current()
indexEntry := d.OnIndexSeries
if indexEntry == nil {
return false, fmt.Errorf("document has no index entry: %s", d.ID)
}
if indexEntry.NeedsIndexGarbageCollected() {
result = true
break
}
}

if err := docs.Err(); err != nil {
return false, err
}

return result, docsCloser.Close()
}

func (m *mutableSegments) shouldEvictCompactedSegmentsWithLock() bool {
return m.state == mutableSegmentsStateClosed
}
Expand Down Expand Up @@ -608,18 +677,7 @@ func (m *mutableSegments) backgroundCompactWithTask(
var documentsFilter segment.DocumentsFilter
if gcRequired {
// Only actively filter out documents if GC is required.
documentsFilter = segment.DocumentsFilterFn(func(d doc.Metadata) bool {
// Filter out any documents that only were indexed for
// sealed blocks.
if d.OnIndexSeries == nil {
instrument.EmitAndLogInvariantViolation(m.iopts, func(l *zap.Logger) {
l.Error("unexpected nil for document index entry for background compact")
})
return true
}

return !d.OnIndexSeries.TryMarkIndexGarbageCollected()
})
documentsFilter = m.seriesActiveFn
}

start := time.Now()
Expand Down
14 changes: 14 additions & 0 deletions src/m3ninx/doc/doc_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/m3ninx/doc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type OnIndexSeries interface {
// from the index. If so, it marks the entry as GCed and returns true. Otherwise returns false.
TryMarkIndexGarbageCollected() bool

// NeedsIndexGarbageCollected returns if the entry is eligible to be garbage collected
// from the index.
NeedsIndexGarbageCollected() bool

// IndexedForBlockStart returns true if the blockStart has been indexed.
IndexedForBlockStart(blockStart xtime.UnixNano) bool
}