Skip to content

Commit

Permalink
import: fix memory leak (#39332) (#39409)
Browse files Browse the repository at this point in the history
close #39331
  • Loading branch information
ti-chi-bot authored Jan 18, 2023
1 parent e96bc9b commit 30c22da
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 2 deletions.
24 changes: 22 additions & 2 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/zap"
)

const maxAvailableBufSize int = 20

// invalidIterator is a trimmed down Iterator type which is invalid.
type invalidIterator struct {
kv.Iterator
Expand Down Expand Up @@ -92,15 +94,33 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) {
buf.idx = 0
buf.cap = len(buf.buf)
mb.Lock()
if len(mb.availableBufs) >= maxAvailableBufSize {
// too many byte buffers, evict one byte buffer and continue
evictedByteBuf := mb.availableBufs[0]
evictedByteBuf.destroy()
mb.availableBufs = mb.availableBufs[1:]
}
mb.availableBufs = append(mb.availableBufs, buf)
mb.Unlock()
}

func (mb *kvMemBuf) AllocateBuf(size int) {
mb.Lock()
size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2)
if len(mb.availableBufs) > 0 && mb.availableBufs[0].cap >= size {
mb.buf = mb.availableBufs[0]
var (
existingBuf *bytesBuf
existingBufIdx int
)
for i, buf := range mb.availableBufs {
if buf.cap >= size {
existingBuf = buf
existingBufIdx = i
break
}
}
if existingBuf != nil {
mb.buf = existingBuf
mb.availableBufs[existingBufIdx] = mb.availableBufs[0]
mb.availableBufs = mb.availableBufs[1:]
} else {
mb.buf = newBytesBuf(size)
Expand Down
101 changes: 101 additions & 0 deletions br/pkg/lightning/backend/kv/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kv
import (
"testing"

"github.com/docker/go-units"
"github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
)
Expand All @@ -26,3 +27,103 @@ func TestSession(t *testing.T) {
_, err := session.Txn(true)
require.NoError(t, err)
}

func TestKVMemBufInterweaveAllocAndRecycle(t *testing.T) {
type testCase struct {
AllocSizes []int
FinalAvailableByteBufCaps []int
}
for _, tc := range []testCase{
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
4 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [4,2,8] => [4,2,8,16]
FinalAvailableByteBufCaps: []int{
4 * units.MiB,
2 * units.MiB,
8 * units.MiB,
16 * units.MiB,
},
},
{
AllocSizes: []int{
5 * units.MiB,
4 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
},
// [16] => [16] => [16] => [16] => [16]
FinalAvailableByteBufCaps: []int{16 * units.MiB},
},
{
AllocSizes: []int{5, 4, 3, 2, 1},
// [1] => [1] => [1] => [1] => [1]
FinalAvailableByteBufCaps: []int{1 * units.MiB},
},
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [2,8,4] => [8,4,2] => [8,4,2,16]
FinalAvailableByteBufCaps: []int{
8 * units.MiB,
4 * units.MiB,
2 * units.MiB,
16 * units.MiB,
},
},
} {
testKVMemBuf := &kvMemBuf{}
for _, allocSize := range tc.AllocSizes {
testKVMemBuf.AllocateBuf(allocSize)
testKVMemBuf.Recycle(testKVMemBuf.buf)
}
require.Equal(t, len(tc.FinalAvailableByteBufCaps), len(testKVMemBuf.availableBufs))
for i, bb := range testKVMemBuf.availableBufs {
require.Equal(t, tc.FinalAvailableByteBufCaps[i], bb.cap)
}
}
}

func TestKVMemBufBatchAllocAndRecycle(t *testing.T) {
testKVMemBuf := &kvMemBuf{}
bBufs := []*bytesBuf{}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(2 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs))
for _, bb := range testKVMemBuf.availableBufs {
require.Equal(t, 4*units.MiB, bb.cap)
}
bBufs = bBufs[:0]
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bb := testKVMemBuf.buf
require.Equal(t, 4*units.MiB, bb.cap)
bBufs = append(bBufs, bb)
require.Equal(t, maxAvailableBufSize-i-1, len(testKVMemBuf.availableBufs))
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs))
}

0 comments on commit 30c22da

Please sign in to comment.