Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: fix encode kvs size greater than 4.0g caused pebble panic (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 26, 2021
1 parent 719285d commit 0a47650
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 4 deletions.
8 changes: 8 additions & 0 deletions pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs(pairs), nil
}

func (kvs kvPairs) Size() uint64 {
size := uint64(0)
for _, kv := range kvs {
size += uint64(len(kv.Key) + len(kv.Val))
}
return size
}

func (kvs kvPairs) ClassifyAndAppend(
data *Rows,
dataChecksum *verification.KVChecksum,
Expand Down
3 changes: 3 additions & 0 deletions pkg/lightning/backend/kv/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Row interface {
indices *Rows,
indexChecksum *verification.KVChecksum,
)

// Size represents the total kv size of this Row.
Size() uint64
}

// Rows represents a collection of encoded rows.
Expand Down
4 changes: 4 additions & 0 deletions pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (e noopEncoder) Encode(log.Logger, []types.Datum, int64, []int) (kv.Row, er

type noopRow struct{}

func (r noopRow) Size() uint64 {
return 0
}

func (r noopRow) ClassifyAndAppend(*kv.Rows, *verification.KVChecksum, *kv.Rows, *verification.KVChecksum) {
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func NewTiDBBackend(db *sql.DB, onDuplicate string) backend.Backend {
return backend.MakeBackend(&tidbBackend{db: db, onDuplicate: onDuplicate})
}

func (row tidbRow) Size() uint64 {
return uint64(len(row))
}

func (row tidbRow) ClassifyAndAppend(data *kv.Rows, checksum *verification.KVChecksum, _ *kv.Rows, _ *verification.KVChecksum) {
rows := (*data).(tidbRows)
// Cannot do `rows := data.(*tidbRows); *rows = append(*rows, row)`.
Expand Down
11 changes: 10 additions & 1 deletion pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2519,6 +2519,7 @@ func (cr *chunkRestore) encodeLoop(
canDeliver := false
kvPacket := make([]deliveredKVs, 0, maxKvPairsCnt)
var newOffset, rowID int64
var kvSize uint64
outLoop:
for !canDeliver {
readDurStart := time.Now()
Expand Down Expand Up @@ -2554,8 +2555,16 @@ func (cr *chunkRestore) encodeLoop(
return
}
kvPacket = append(kvPacket, deliveredKVs{kvs: kvs, columns: columnNames, offset: newOffset, rowID: rowID})
if len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset {
kvSize += kvs.Size()
failpoint.Inject("mock-kv-size", func(val failpoint.Value) {
kvSize += uint64(val.(int))
})
// pebble cannot allow > 4.0G kv in one batch.
// we will meet pebble panic when import sql file and each kv has the size larger than 4G / maxKvPairsCnt.
// so add this check.
if kvSize >= minDeliverBytes || len(kvPacket) >= maxKvPairsCnt || newOffset == cr.chunk.Chunk.EndOffset {
canDeliver = true
kvSize = 0
}
}
encodeTotalDur += encodeDur
Expand Down
51 changes: 51 additions & 0 deletions pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,57 @@ func (s *chunkRestoreSuite) TestEncodeLoopForcedError(c *C) {
c.Assert(kvsCh, HasLen, 0)
}

func (s *chunkRestoreSuite) TestEncodeLoopDeliverLimit(c *C) {
ctx := context.Background()
kvsCh := make(chan []deliveredKVs, 4)
deliverCompleteCh := make(chan deliverResult)
kvEncoder, err := kv.NewTableKVEncoder(s.tr.encTable, &kv.SessionOptions{
SQLMode: s.cfg.TiDB.SQLMode,
Timestamp: 1234567898,
})
c.Assert(err, IsNil)

dir := c.MkDir()
fileName := "db.limit.000.csv"
err = ioutil.WriteFile(filepath.Join(dir, fileName), []byte("1,2,3\r\n4,5,6\r\n7,8,9\r"), 0o644)
c.Assert(err, IsNil)

store, err := storage.NewLocalStorage(dir)
c.Assert(err, IsNil)
cfg := config.NewConfig()

reader, err := store.Open(ctx, fileName)
c.Assert(err, IsNil)
w := worker.NewPool(ctx, 1, "io")
p := mydump.NewCSVParser(&cfg.Mydumper.CSV, reader, 111, w, false)
s.cr.parser = p

rc := &Controller{pauser: DeliverPauser, cfg: cfg}
c.Assert(failpoint.Enable(
"github.com/pingcap/br/pkg/lightning/restore/mock-kv-size", "return(110000000)"), IsNil)
_, _, err = s.cr.encodeLoop(ctx, kvsCh, s.tr, s.tr.logger, kvEncoder, deliverCompleteCh, rc)

// we have 3 kvs total. after the failpoint injected.
// we will send one kv each time.
count := 0
for {
kvs, ok := <-kvsCh
if !ok {
break
}
count += 1
if count <= 3 {
c.Assert(kvs, HasLen, 1)
}
if count == 4 {
// we will send empty kvs before encodeLoop exists
// so, we can receive 4 batch and 1 is empty
c.Assert(kvs, HasLen, 0)
break
}
}
}

func (s *chunkRestoreSuite) TestEncodeLoopDeliverErrored(c *C) {
ctx := context.Background()
kvsCh := make(chan []deliveredKVs)
Expand Down
6 changes: 3 additions & 3 deletions tests/lightning_checkpoint_parquet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ set +e
run_lightning -d "$DBPATH" --backend tidb --enable-checkpoint=1 2> /dev/null
set -e
run_sql 'SELECT count(*), sum(iVal) FROM `cppq_tsr`.tbl'
check_contains "count(*): 32"
# sum(0..31)
check_contains "sum(iVal): 496"
check_contains "count(*): 1"
# sum(0)
check_contains "sum(iVal): 0"

# check chunk offset and update checkpoint current row id to a higher value so that
# if parse read from start, the generated rows will be different
Expand Down

0 comments on commit 0a47650

Please sign in to comment.