diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index fab204654c68..db8080b591a0 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -900,7 +900,9 @@ func TestEvalAddSSTable(t *testing.T) { batcheval.AddSSTableRewriteConcurrency.Override(ctx, &st.SV, int64(c.(int))) batcheval.AddSSTableRequireAtRequestTimestamp.Override(ctx, &st.SV, tc.requireReqTS) - engine := storage.NewDefaultInMemForTesting() + // TODO(jackson): Track down the iterator leak and + // remove the storage.LeaksIteratorsTODO option. + engine := storage.NewDefaultInMemForTesting(storage.LeaksIteratorsTODO) defer engine.Close() // Write initial data. @@ -1070,7 +1072,10 @@ func TestEvalAddSSTableRangefeed(t *testing.T) { st := cluster.MakeTestingClusterSettings() ctx := context.Background() - engine := storage.NewDefaultInMemForTesting() + // TODO(jackson): This test leaks Pebble iterators. There are few + // opportunities to leak iterators in test-only code, so this is + // likely a real iterator leak. + engine := storage.NewDefaultInMemForTesting(storage.LeaksIteratorsTODO) defer engine.Close() opLogger := storage.NewOpLoggerBatch(engine.NewBatch()) diff --git a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go index 99d0b7ac6f7c..07a18a3448c9 100644 --- a/pkg/kv/kvserver/loqrecovery/recovery_env_test.go +++ b/pkg/kv/kvserver/loqrecovery/recovery_env_test.go @@ -416,7 +416,10 @@ func (e *quorumRecoveryEnv) getOrCreateStore( wrapped := e.stores[storeID] if wrapped.nodeID == 0 { var err error - eng, err := storage.Open(ctx, storage.InMemory(), storage.CacheSize(1<<20 /* 1 MiB */)) + eng, err := storage.Open(ctx, + storage.InMemory(), + storage.CacheSize(1<<20 /* 1 MiB */), + storage.LeaksIteratorsTODO) if err != nil { t.Fatalf("failed to crate in mem store: %v", err) } diff --git a/pkg/kv/kvserver/mvcc_gc_queue_test.go b/pkg/kv/kvserver/mvcc_gc_queue_test.go index 4f5eae5c1c8c..0a3d4350489c 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue_test.go +++ b/pkg/kv/kvserver/mvcc_gc_queue_test.go @@ -478,7 +478,7 @@ func TestFullRangeDeleteHeuristic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() - eng := storage.NewDefaultInMemForTesting() + eng := storage.NewDefaultInMemForTesting(storage.LeaksIteratorsTODO) defer eng.Close() rng, _ := randutil.NewTestRand() diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 4996ef5286b2..3b88a573e97b 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -865,6 +865,9 @@ type Engine interface { // NewBatch returns a new instance of a batched engine which wraps // this engine. Batched engines accumulate all mutations and apply // them atomically on a call to Commit(). + // + // It is necessary to close a batch in order to release a batch's + // cached iterators. NewBatch() Batch // NewReadOnly returns a new instance of a ReadWriter that wraps this // engine, and with the given durability requirement. This wrapper panics diff --git a/pkg/storage/intent_interleaving_iter_test.go b/pkg/storage/intent_interleaving_iter_test.go index 0929d58010a2..6c5841a9f4cb 100644 --- a/pkg/storage/intent_interleaving_iter_test.go +++ b/pkg/storage/intent_interleaving_iter_test.go @@ -840,6 +840,7 @@ func BenchmarkIntentInterleavingIterNext(b *testing.B) { } else { iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts) } + defer iter.Close() startKey := MVCCKey{Key: state.keyPrefix} iter.SeekGE(startKey) b.ResetTimer() @@ -878,6 +879,7 @@ func BenchmarkIntentInterleavingIterPrev(b *testing.B) { } else { iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts) } + defer iter.Close() iter.SeekLT(endKey) b.ResetTimer() var unsafeKey MVCCKey @@ -920,6 +922,7 @@ func BenchmarkIntentInterleavingSeekGEAndIter(b *testing.B) { } else { iter = state.eng.NewMVCCIterator(MVCCKeyIterKind, opts) } + defer iter.Close() b.ResetTimer() for i := 0; i < b.N; i++ { j := i % len(seekKeys) diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index eb00b2e2f1b5..cfef710dac3c 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -1619,6 +1619,7 @@ func BenchmarkMVCCIncrementalIteratorForOldData(b *testing.B) { b.Fatal(err) } } + batch.Close() if err := eng.Flush(); err != nil { b.Fatal(err) } @@ -1635,21 +1636,23 @@ func BenchmarkMVCCIncrementalIteratorForOldData(b *testing.B) { endKey := roachpb.Key(encoding.EncodeUvarintAscending([]byte("key-"), uint64(numKeys))) b.ResetTimer() for i := 0; i < b.N; i++ { - it := NewMVCCIncrementalIterator(eng, MVCCIncrementalIterOptions{ - EndKey: endKey, - StartTime: hlc.Timestamp{}, - EndTime: hlc.Timestamp{WallTime: baseTimestamp}, - }) - it.SeekGE(MVCCKey{Key: startKey}) - for { - if ok, err := it.Valid(); err != nil { - b.Fatalf("failed incremental iteration: %+v", err) - } else if !ok { - break + func() { + it := NewMVCCIncrementalIterator(eng, MVCCIncrementalIterOptions{ + EndKey: endKey, + StartTime: hlc.Timestamp{}, + EndTime: hlc.Timestamp{WallTime: baseTimestamp}, + }) + defer it.Close() + it.SeekGE(MVCCKey{Key: startKey}) + for { + if ok, err := it.Valid(); err != nil { + b.Fatalf("failed incremental iteration: %+v", err) + } else if !ok { + break + } + it.Next() } - it.Next() - } - it.Close() + }() } }) eng.Close() diff --git a/pkg/storage/open.go b/pkg/storage/open.go index 3d6da78711d1..9a13ca869c4d 100644 --- a/pkg/storage/open.go +++ b/pkg/storage/open.go @@ -159,6 +159,13 @@ func EncryptionAtRest(encryptionOptions []byte) ConfigOption { } } +// LeaksIteratorsTODO is a temporary option that is used in existing tests that +// leak iterators and must be updated. See #71481. +var LeaksIteratorsTODO ConfigOption = func(cfg *engineConfig) error { + cfg.allowUncleanClose = true + return nil +} + // Hook configures a hook to initialize additional storage options. It's used // to initialize encryption-at-rest details in CCL builds. func Hook(hookFunc func(*base.StorageConfig) error) ConfigOption { @@ -202,6 +209,9 @@ type engineConfig struct { // a ref count of 1, so creating the Cache during execution of // ConfigOption makes it too easy to leak a cache. cacheSize *int64 + // allowUncleanClose is a temporary option that configures a *Pebble to not + // panic if DB.Close returns a non-nil error. + allowUncleanClose bool } // Open opens a new Pebble storage engine, reading and writing data to the @@ -227,6 +237,7 @@ func Open(ctx context.Context, loc Location, opts ...ConfigOption) (*Pebble, err if err != nil { return nil, err } + p.allowUncleanClose = cfg.allowUncleanClose // Set the active cluster version, ensuring the engine's format // major version is ratcheted sufficiently high to match the // settings cluster version. diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 3df59932bd06..90881415bad5 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -705,6 +705,11 @@ type Pebble struct { wrappedIntentWriter intentDemuxWriter storeIDPebbleLog *base.StoreIDContainer + + // allowUncleanClose is a temporary field until #71481 is fixed. Existing + // tests that leak iterators set this field so that Pebble.Close does not + // panic when it observes leaked iterators. + allowUncleanClose bool } // EncryptionEnv describes the encryption-at-rest environment, providing @@ -1038,15 +1043,24 @@ func (p *Pebble) Close() { return } p.closed = true - _ = p.db.Close() + err := p.db.Close() + if !p.allowUncleanClose && err != nil { + panic(err) + } if p.fileRegistry != nil { - _ = p.fileRegistry.Close() + if err := p.fileRegistry.Close(); !p.allowUncleanClose && err != nil { + panic(err) + } } if p.encryption != nil { - _ = p.encryption.Closer.Close() + if err := p.encryption.Closer.Close(); !p.allowUncleanClose && err != nil { + panic(err) + } } if p.closer != nil { - _ = p.closer.Close() + if err := p.closer.Close(); !p.allowUncleanClose && err != nil { + panic(err) + } } }