Skip to content

Commit

Permalink
storage: check error on Engine close
Browse files Browse the repository at this point in the history
Panic if an error is encountered while closing the Engine. This ensures
unit tests and the like observe errors, especially related to leaked
iterators.

Close cockroachdb#71481.

Release justification: low-risk bug fixes and non-production code changes
Release note: None
  • Loading branch information
jbowens committed Sep 8, 2022
1 parent 1a637b0 commit dba2e27
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 22 deletions.
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,9 @@ func TestEvalAddSSTable(t *testing.T) {
batcheval.AddSSTableRewriteConcurrency.Override(ctx, &st.SV, int64(c.(int)))
batcheval.AddSSTableRequireAtRequestTimestamp.Override(ctx, &st.SV, tc.requireReqTS)

engine := storage.NewDefaultInMemForTesting()
// TODO(jackson): Track down the iterator leak and
// remove the storage.LeaksIteratorsTODO option.
engine := storage.NewDefaultInMemForTesting(storage.LeaksIteratorsTODO)
defer engine.Close()

// Write initial data.
Expand Down Expand Up @@ -1070,7 +1072,10 @@ func TestEvalAddSSTableRangefeed(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
ctx := context.Background()

engine := storage.NewDefaultInMemForTesting()
// TODO(jackson): This test leaks Pebble iterators. There are few
// opportunities to leak iterators in test-only code, so this is
// likely a real iterator leak.
engine := storage.NewDefaultInMemForTesting(storage.LeaksIteratorsTODO)
defer engine.Close()
opLogger := storage.NewOpLoggerBatch(engine.NewBatch())

Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/loqrecovery/recovery_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,10 @@ func (e *quorumRecoveryEnv) getOrCreateStore(
wrapped := e.stores[storeID]
if wrapped.nodeID == 0 {
var err error
eng, err := storage.Open(ctx, storage.InMemory(), storage.CacheSize(1<<20 /* 1 MiB */))
eng, err := storage.Open(ctx,
storage.InMemory(),
storage.CacheSize(1<<20 /* 1 MiB */),
storage.LeaksIteratorsTODO)
if err != nil {
t.Fatalf("failed to crate in mem store: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/mvcc_gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func TestFullRangeDeleteHeuristic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
eng := storage.NewDefaultInMemForTesting()
eng := storage.NewDefaultInMemForTesting(storage.LeaksIteratorsTODO)
defer eng.Close()

rng, _ := randutil.NewTestRand()
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ type Engine interface {
// NewBatch returns a new instance of a batched engine which wraps
// this engine. Batched engines accumulate all mutations and apply
// them atomically on a call to Commit().
//
// It is necessary to close a batch in order to release a batch's
// cached iterators.
NewBatch() Batch
// NewReadOnly returns a new instance of a ReadWriter that wraps this
// engine, and with the given durability requirement. This wrapper panics
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/intent_interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,7 @@ func BenchmarkIntentInterleavingIterNext(b *testing.B) {
} else {
iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts)
}
defer iter.Close()
startKey := MVCCKey{Key: state.keyPrefix}
iter.SeekGE(startKey)
b.ResetTimer()
Expand Down Expand Up @@ -878,6 +879,7 @@ func BenchmarkIntentInterleavingIterPrev(b *testing.B) {
} else {
iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts)
}
defer iter.Close()
iter.SeekLT(endKey)
b.ResetTimer()
var unsafeKey MVCCKey
Expand Down Expand Up @@ -920,6 +922,7 @@ func BenchmarkIntentInterleavingSeekGEAndIter(b *testing.B) {
} else {
iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts)
}
defer iter.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
j := i % len(seekKeys)
Expand Down
31 changes: 17 additions & 14 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,6 +1619,7 @@ func BenchmarkMVCCIncrementalIteratorForOldData(b *testing.B) {
b.Fatal(err)
}
}
batch.Close()
if err := eng.Flush(); err != nil {
b.Fatal(err)
}
Expand All @@ -1635,21 +1636,23 @@ func BenchmarkMVCCIncrementalIteratorForOldData(b *testing.B) {
endKey := roachpb.Key(encoding.EncodeUvarintAscending([]byte("key-"), uint64(numKeys)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
it := NewMVCCIncrementalIterator(eng, MVCCIncrementalIterOptions{
EndKey: endKey,
StartTime: hlc.Timestamp{},
EndTime: hlc.Timestamp{WallTime: baseTimestamp},
})
it.SeekGE(MVCCKey{Key: startKey})
for {
if ok, err := it.Valid(); err != nil {
b.Fatalf("failed incremental iteration: %+v", err)
} else if !ok {
break
func() {
it := NewMVCCIncrementalIterator(eng, MVCCIncrementalIterOptions{
EndKey: endKey,
StartTime: hlc.Timestamp{},
EndTime: hlc.Timestamp{WallTime: baseTimestamp},
})
defer it.Close()
it.SeekGE(MVCCKey{Key: startKey})
for {
if ok, err := it.Valid(); err != nil {
b.Fatalf("failed incremental iteration: %+v", err)
} else if !ok {
break
}
it.Next()
}
it.Next()
}
it.Close()
}()
}
})
eng.Close()
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ func EncryptionAtRest(encryptionOptions []byte) ConfigOption {
}
}

// LeaksIteratorsTODO is a temporary option that is used in existing tests that
// leak iterators and must be updated. See #71481.
var LeaksIteratorsTODO ConfigOption = func(cfg *engineConfig) error {
cfg.allowUncleanClose = true
return nil
}

// Hook configures a hook to initialize additional storage options. It's used
// to initialize encryption-at-rest details in CCL builds.
func Hook(hookFunc func(*base.StorageConfig) error) ConfigOption {
Expand Down Expand Up @@ -202,6 +209,9 @@ type engineConfig struct {
// a ref count of 1, so creating the Cache during execution of
// ConfigOption makes it too easy to leak a cache.
cacheSize *int64
// allowUncleanClose is a temporary option that configures a *Pebble to not
// panic if DB.Close returns a non-nil error.
allowUncleanClose bool
}

// Open opens a new Pebble storage engine, reading and writing data to the
Expand All @@ -227,6 +237,7 @@ func Open(ctx context.Context, loc Location, opts ...ConfigOption) (*Pebble, err
if err != nil {
return nil, err
}
p.allowUncleanClose = cfg.allowUncleanClose
// Set the active cluster version, ensuring the engine's format
// major version is ratcheted sufficiently high to match the
// settings cluster version.
Expand Down
22 changes: 18 additions & 4 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,11 @@ type Pebble struct {
wrappedIntentWriter intentDemuxWriter

storeIDPebbleLog *base.StoreIDContainer

// allowUncleanClose is a temporary field until #71481 is fixed. Existing
// tests that leak iterators set this field so that Pebble.Close does not
// panic when it observes leaked iterators.
allowUncleanClose bool
}

// EncryptionEnv describes the encryption-at-rest environment, providing
Expand Down Expand Up @@ -1038,15 +1043,24 @@ func (p *Pebble) Close() {
return
}
p.closed = true
_ = p.db.Close()
err := p.db.Close()
if !p.allowUncleanClose && err != nil {
panic(err)
}
if p.fileRegistry != nil {
_ = p.fileRegistry.Close()
if err := p.fileRegistry.Close(); !p.allowUncleanClose && err != nil {
panic(err)
}
}
if p.encryption != nil {
_ = p.encryption.Closer.Close()
if err := p.encryption.Closer.Close(); !p.allowUncleanClose && err != nil {
panic(err)
}
}
if p.closer != nil {
_ = p.closer.Close()
if err := p.closer.Close(); !p.allowUncleanClose && err != nil {
panic(err)
}
}
}

Expand Down

0 comments on commit dba2e27

Please sign in to comment.