Skip to content

Commit

Permalink
storage: check error on engine close
Browse files Browse the repository at this point in the history
When the crdb_test build flag is provided, fatal the process if an engine Close
returns an error. This ensures unit tests and the like observe errors,
especially related to leaked iterators.

Close #71481.

Release justification: low-risk bug fixes and non-production code changes
Release note: None
  • Loading branch information
jbowens committed Sep 13, 2022
1 parent 287836d commit e4a8a43
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 21 deletions.
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,7 @@ func TestEvalAddSSTable(t *testing.T) {
// Write initial data.
intentTxn := roachpb.MakeTransaction("intentTxn", nil, 0, hlc.Timestamp{WallTime: intentTS * 1e9}, 0, 1)
b := engine.NewBatch()
defer b.Close()
for i := len(tc.data) - 1; i >= 0; i-- { // reverse, older timestamps first
switch kv := tc.data[i].(type) {
case storage.MVCCKeyValue:
Expand Down Expand Up @@ -1135,6 +1136,7 @@ func TestEvalAddSSTableRangefeed(t *testing.T) {
engine := storage.NewDefaultInMemForTesting()
defer engine.Close()
opLogger := storage.NewOpLoggerBatch(engine.NewBatch())
defer opLogger.Close()

// Build and add SST.
sst, start, end := storageutils.MakeSST(t, st, tc.sst)
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/loqrecovery/recovery_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ func (e *quorumRecoveryEnv) getOrCreateStore(
wrapped := e.stores[storeID]
if wrapped.nodeID == 0 {
var err error
eng, err := storage.Open(ctx, storage.InMemory(), storage.CacheSize(1<<20 /* 1 MiB */))
eng, err := storage.Open(ctx,
storage.InMemory(),
storage.CacheSize(1<<20 /* 1 MiB */))
if err != nil {
t.Fatalf("failed to crate in mem store: %v", err)
}
Expand Down Expand Up @@ -574,6 +576,13 @@ func (e *quorumRecoveryEnv) handleApplyPlan(t *testing.T, d datadriven.TestData)
ctx := context.Background()
stores := e.parseStoresArg(t, d, true /* defaultToAll */)
nodes := e.groupStoresByNodeStore(t, stores)
defer func() {
for _, storeBatches := range nodes {
for _, b := range storeBatches {
b.Close()
}
}
}()
updateTime := timeutil.Now()
for nodeID, stores := range nodes {
_, err := PrepareUpdateReplicas(ctx, e.plan, uuid.DefaultGenerator, updateTime, nodeID, stores)
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/mvcc_gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,8 @@ func TestFullRangeDeleteHeuristic(t *testing.T) {

rangeMs := ms
pointMs := ms
deleteWithTombstone(eng.NewBatch(), deletionTime, &rangeMs)
deleteWithPoints(eng.NewBatch(), deletionTime, &pointMs)
withBatch(eng, func(b storage.Batch) { deleteWithTombstone(b, deletionTime, &rangeMs) })
withBatch(eng, func(b storage.Batch) { deleteWithPoints(b, deletionTime, &pointMs) })

gcTTL := time.Minute * 30
for _, d := range []struct {
Expand All @@ -549,6 +549,12 @@ func TestFullRangeDeleteHeuristic(t *testing.T) {
}
}

func withBatch(eng storage.Engine, fn func(b storage.Batch)) {
b := eng.NewBatch()
defer b.Close()
fn(b)
}

// TestMVCCGCQueueProcess creates test data in the range over various time
// scales and verifies that scan queue process properly GCs test data.
func TestMVCCGCQueueProcess(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_library(
"//pkg/storage/fs",
"//pkg/util",
"//pkg/util/bufalloc",
"//pkg/util/buildutil",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/intent_interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ func BenchmarkIntentInterleavingIterNext(b *testing.B) {
} else {
iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts)
}
defer iter.Close()
startKey := MVCCKey{Key: state.keyPrefix}
iter.SeekGE(startKey)
b.ResetTimer()
Expand Down Expand Up @@ -878,6 +879,7 @@ func BenchmarkIntentInterleavingIterPrev(b *testing.B) {
} else {
iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts)
}
defer iter.Close()
iter.SeekLT(endKey)
b.ResetTimer()
var unsafeKey MVCCKey
Expand Down Expand Up @@ -920,6 +922,7 @@ func BenchmarkIntentInterleavingSeekGEAndIter(b *testing.B) {
} else {
iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts)
}
defer iter.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
j := i % len(seekKeys)
Expand Down
31 changes: 17 additions & 14 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,7 @@ func BenchmarkMVCCIncrementalIteratorForOldData(b *testing.B) {
b.Fatal(err)
}
}
batch.Close()
if err := eng.Flush(); err != nil {
b.Fatal(err)
}
Expand All @@ -1635,21 +1636,23 @@ func BenchmarkMVCCIncrementalIteratorForOldData(b *testing.B) {
endKey := roachpb.Key(encoding.EncodeUvarintAscending([]byte("key-"), uint64(numKeys)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
it := NewMVCCIncrementalIterator(eng, MVCCIncrementalIterOptions{
EndKey: endKey,
StartTime: hlc.Timestamp{},
EndTime: hlc.Timestamp{WallTime: baseTimestamp},
})
it.SeekGE(MVCCKey{Key: startKey})
for {
if ok, err := it.Valid(); err != nil {
b.Fatalf("failed incremental iteration: %+v", err)
} else if !ok {
break
func() {
it := NewMVCCIncrementalIterator(eng, MVCCIncrementalIterOptions{
EndKey: endKey,
StartTime: hlc.Timestamp{},
EndTime: hlc.Timestamp{WallTime: baseTimestamp},
})
defer it.Close()
it.SeekGE(MVCCKey{Key: startKey})
for {
if ok, err := it.Valid(); err != nil {
b.Fatalf("failed incremental iteration: %+v", err)
} else if !ok {
break
}
it.Next()
}
it.Next()
}
it.Close()
}()
}
})
eng.Close()
Expand Down
31 changes: 27 additions & 4 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -684,6 +685,7 @@ type Pebble struct {
// Relevant options copied over from pebble.Options.
fs vfs.FS
unencryptedFS vfs.FS
logCtx context.Context
logger pebble.Logger
eventListener *pebble.EventListener
mu struct {
Expand Down Expand Up @@ -896,6 +898,7 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
fs: cfg.Opts.FS,
unencryptedFS: unencryptedFS,
logger: cfg.Opts.Logger,
logCtx: logCtx,
storeIDPebbleLog: storeIDContainer,
closer: filesystemCloser,
}
Expand Down Expand Up @@ -1038,15 +1041,35 @@ func (p *Pebble) Close() {
return
}
p.closed = true
_ = p.db.Close()

handleErr := func(err error) {
if err == nil {
return
}
// Allow unclean close in production builds for now. We refrain from
// Fatal-ing on an unclean close because Cockroach opens and closes
// ephemeral engines at time, and an error in those codepaths should not
// fatal the process.
//
// TODO(jackson): Propagate the error to call sites without fataling:
// This is tricky, because the Reader interface requires Close return
// nothing.
if buildutil.CrdbTestBuild {
log.Fatalf(p.logCtx, "error during engine close: %s\n", err)
} else {
log.Errorf(p.logCtx, "error during engine close: %s\n", err)
}
}

handleErr(p.db.Close())
if p.fileRegistry != nil {
_ = p.fileRegistry.Close()
handleErr(p.fileRegistry.Close())
}
if p.encryption != nil {
_ = p.encryption.Closer.Close()
handleErr(p.encryption.Closer.Close())
}
if p.closer != nil {
_ = p.closer.Close()
handleErr(p.closer.Close())
}
}

Expand Down

0 comments on commit e4a8a43

Please sign in to comment.