Skip to content

Commit

Permalink
backend/local: fix panic while lightning quits (#33738)
Browse files Browse the repository at this point in the history
close #33524
  • Loading branch information
lichunzhu authored Apr 7, 2022
1 parent be76f88 commit 15a01fa
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
16 changes: 13 additions & 3 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,15 @@ func getSizeProperties(logger log.Logger, db *pebble.DB, keyAdapter KeyAdapter)
}

func (e *Engine) getEngineFileSize() backend.EngineFileSize {
metrics := e.db.Metrics()
total := metrics.Total()
e.mutex.RLock()
db := e.db
e.mutex.RUnlock()

var total pebble.LevelMetrics
if db != nil {
metrics := db.Metrics()
total = metrics.Total()
}
var memSize int64
e.localWriters.Range(func(k, v interface{}) bool {
w := k.(*Writer)
Expand Down Expand Up @@ -524,7 +531,6 @@ func (e *Engine) ingestSSTLoop() {
for i := 0; i < concurrency; i++ {
e.wg.Add(1)
go func() {
defer e.wg.Done()
defer func() {
if e.ingestErr.Get() != nil {
seqLock.Lock()
Expand All @@ -534,6 +540,7 @@ func (e *Engine) ingestSSTLoop() {
flushQueue = flushQueue[:0]
seqLock.Unlock()
}
e.wg.Done()
}()
for {
select {
Expand Down Expand Up @@ -1471,5 +1478,8 @@ func (i dbSSTIngester) ingest(metas []*sstMeta) error {
for _, m := range metas {
paths = append(paths, m.path)
}
if i.e.db == nil {
return errorEngineClosed
}
return i.e.db.Ingest(paths)
}
85 changes: 85 additions & 0 deletions br/pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"context"
"fmt"
"math"
"os"
"path"
"path/filepath"
"testing"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/google/uuid"
"github.com/stretchr/testify/require"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
)

func TestIngestSSTWithClosedEngine(t *testing.T) {
dir := t.TempDir()
opt := &pebble.Options{
MemTableSize: 1024 * 1024,
MaxConcurrentCompactions: 16,
L0CompactionThreshold: math.MaxInt32, // set to max try to disable compaction
L0StopWritesThreshold: math.MaxInt32, // set to max try to disable compaction
DisableWAL: true,
ReadOnly: false,
}
db, err := pebble.Open(filepath.Join(dir, "test"), opt)
require.NoError(t, err)
tmpPath := filepath.Join(dir, "test.sst")
err = os.Mkdir(tmpPath, 0o755)
require.NoError(t, err)

_, engineUUID := backend.MakeUUID("ww", 0)
engineCtx, cancel := context.WithCancel(context.Background())
f := &Engine{
db: db,
UUID: engineUUID,
sstDir: tmpPath,
ctx: engineCtx,
cancel: cancel,
sstMetasChan: make(chan metaOrFlush, 64),
keyAdapter: noopKeyAdapter{},
}
f.sstIngester = dbSSTIngester{e: f}
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
file, err := os.Create(sstPath)
require.NoError(t, err)
w := sstable.NewWriter(file, sstable.WriterOptions{})
for i := 0; i < 10; i++ {
require.NoError(t, w.Add(sstable.InternalKey{
Trailer: uint64(sstable.InternalKeyKindSet),
UserKey: []byte(fmt.Sprintf("key%d", i)),
}, nil))
}
require.NoError(t, w.Close())

require.NoError(t, f.ingestSSTs([]*sstMeta{
{
path: sstPath,
},
}))
require.NoError(t, f.Close())
require.ErrorIs(t, f.ingestSSTs([]*sstMeta{
{
path: sstPath,
},
}), errorEngineClosed)
}

0 comments on commit 15a01fa

Please sign in to comment.