Skip to content

Commit

Permalink
import: fix memory leak (#39332)
Browse files Browse the repository at this point in the history
close #39331
  • Loading branch information
dsdashun authored Nov 28, 2022
1 parent 7f632be commit 5c9570d
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 3 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ go_test(
name = "kv_test",
timeout = "short",
srcs = [
"session_internal_test.go",
"session_test.go",
"sql2kv_test.go",
],
embed = [":kv"],
flaky = True,
race = "on",
deps = [
":kv",
"//br/pkg/lightning/common",
"//br/pkg/lightning/log",
"//br/pkg/lightning/verification",
Expand All @@ -69,6 +70,7 @@ go_test(
"//tablecodec",
"//types",
"//util/mock",
"@com_github_docker_go_units//:go-units",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
Expand Down
24 changes: 22 additions & 2 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"go.uber.org/zap"
)

const maxAvailableBufSize int = 20

// invalidIterator is a trimmed down Iterator type which is invalid.
type invalidIterator struct {
kv.Iterator
Expand Down Expand Up @@ -92,15 +94,33 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) {
buf.idx = 0
buf.cap = len(buf.buf)
mb.Lock()
if len(mb.availableBufs) >= maxAvailableBufSize {
// too many byte buffers, evict one byte buffer and continue
evictedByteBuf := mb.availableBufs[0]
evictedByteBuf.destroy()
mb.availableBufs = mb.availableBufs[1:]
}
mb.availableBufs = append(mb.availableBufs, buf)
mb.Unlock()
}

func (mb *kvMemBuf) AllocateBuf(size int) {
mb.Lock()
size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2)
if len(mb.availableBufs) > 0 && mb.availableBufs[0].cap >= size {
mb.buf = mb.availableBufs[0]
var (
existingBuf *bytesBuf
existingBufIdx int
)
for i, buf := range mb.availableBufs {
if buf.cap >= size {
existingBuf = buf
existingBufIdx = i
break
}
}
if existingBuf != nil {
mb.buf = existingBuf
mb.availableBufs[existingBufIdx] = mb.availableBufs[0]
mb.availableBufs = mb.availableBufs[1:]
} else {
mb.buf = newBytesBuf(size)
Expand Down
126 changes: 126 additions & 0 deletions br/pkg/lightning/backend/kv/session_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"testing"

"github.com/docker/go-units"
"github.com/stretchr/testify/require"
)

func TestKVMemBufInterweaveAllocAndRecycle(t *testing.T) {
type testCase struct {
AllocSizes []int
FinalAvailableByteBufCaps []int
}
for _, tc := range []testCase{
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
4 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [4,2,8] => [4,2,8,16]
FinalAvailableByteBufCaps: []int{
4 * units.MiB,
2 * units.MiB,
8 * units.MiB,
16 * units.MiB,
},
},
{
AllocSizes: []int{
5 * units.MiB,
4 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
},
// [16] => [16] => [16] => [16] => [16]
FinalAvailableByteBufCaps: []int{16 * units.MiB},
},
{
AllocSizes: []int{5, 4, 3, 2, 1},
// [1] => [1] => [1] => [1] => [1]
FinalAvailableByteBufCaps: []int{1 * units.MiB},
},
{
AllocSizes: []int{
1 * units.MiB,
2 * units.MiB,
3 * units.MiB,
2 * units.MiB,
1 * units.MiB,
5 * units.MiB,
},
// [2] => [2,4] => [2,4,8] => [2,8,4] => [8,4,2] => [8,4,2,16]
FinalAvailableByteBufCaps: []int{
8 * units.MiB,
4 * units.MiB,
2 * units.MiB,
16 * units.MiB,
},
},
} {
testKVMemBuf := &kvMemBuf{}
for _, allocSize := range tc.AllocSizes {
testKVMemBuf.AllocateBuf(allocSize)
testKVMemBuf.Recycle(testKVMemBuf.buf)
}
require.Equal(t, len(tc.FinalAvailableByteBufCaps), len(testKVMemBuf.availableBufs))
for i, bb := range testKVMemBuf.availableBufs {
require.Equal(t, tc.FinalAvailableByteBufCaps[i], bb.cap)
}
}
}

func TestKVMemBufBatchAllocAndRecycle(t *testing.T) {
type testCase struct {
AllocSizes []int
FinalAvailableByteBufCaps []int
}
testKVMemBuf := &kvMemBuf{}
bBufs := []*bytesBuf{}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(2 * units.MiB)
bBufs = append(bBufs, testKVMemBuf.buf)
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs))
for _, bb := range testKVMemBuf.availableBufs {
require.Equal(t, 4*units.MiB, bb.cap)
}
bBufs = bBufs[:0]
for i := 0; i < maxAvailableBufSize; i++ {
testKVMemBuf.AllocateBuf(1 * units.MiB)
bb := testKVMemBuf.buf
require.Equal(t, 4*units.MiB, bb.cap)
bBufs = append(bBufs, bb)
require.Equal(t, maxAvailableBufSize-i-1, len(testKVMemBuf.availableBufs))
}
for _, bb := range bBufs {
testKVMemBuf.Recycle(bb)
}
require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs))
}

0 comments on commit 5c9570d

Please sign in to comment.