diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index ea2cfefc2440e..b0da8a0e7deb4 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -46,13 +46,14 @@ go_test( name = "kv_test", timeout = "short", srcs = [ + "session_internal_test.go", "session_test.go", "sql2kv_test.go", ], + embed = [":kv"], flaky = True, race = "on", deps = [ - ":kv", "//br/pkg/lightning/common", "//br/pkg/lightning/log", "//br/pkg/lightning/verification", @@ -69,6 +70,7 @@ go_test( "//tablecodec", "//types", "//util/mock", + "@com_github_docker_go_units//:go-units", "@com_github_stretchr_testify//require", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", diff --git a/br/pkg/lightning/backend/kv/session.go b/br/pkg/lightning/backend/kv/session.go index 1cc261b677fe4..a8c5b5970cdf8 100644 --- a/br/pkg/lightning/backend/kv/session.go +++ b/br/pkg/lightning/backend/kv/session.go @@ -38,6 +38,8 @@ import ( "go.uber.org/zap" ) +const maxAvailableBufSize int = 20 + // invalidIterator is a trimmed down Iterator type which is invalid. type invalidIterator struct { kv.Iterator @@ -92,6 +94,12 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) { buf.idx = 0 buf.cap = len(buf.buf) mb.Lock() + if len(mb.availableBufs) >= maxAvailableBufSize { + // too many byte buffers, evict one byte buffer and continue + evictedByteBuf := mb.availableBufs[0] + evictedByteBuf.destroy() + mb.availableBufs = mb.availableBufs[1:] + } mb.availableBufs = append(mb.availableBufs, buf) mb.Unlock() } @@ -99,8 +107,20 @@ func (mb *kvMemBuf) Recycle(buf *bytesBuf) { func (mb *kvMemBuf) AllocateBuf(size int) { mb.Lock() size = mathutil.Max(units.MiB, int(utils.NextPowerOfTwo(int64(size)))*2) - if len(mb.availableBufs) > 0 && mb.availableBufs[0].cap >= size { - mb.buf = mb.availableBufs[0] + var ( + existingBuf *bytesBuf + existingBufIdx int + ) + for i, buf := range mb.availableBufs { + if buf.cap >= size { + existingBuf = buf + existingBufIdx = i + break + } + } + if existingBuf != nil { + mb.buf = existingBuf + mb.availableBufs[existingBufIdx] = mb.availableBufs[0] mb.availableBufs = mb.availableBufs[1:] } else { mb.buf = newBytesBuf(size) diff --git a/br/pkg/lightning/backend/kv/session_internal_test.go b/br/pkg/lightning/backend/kv/session_internal_test.go new file mode 100644 index 0000000000000..97ebd8cc82d1b --- /dev/null +++ b/br/pkg/lightning/backend/kv/session_internal_test.go @@ -0,0 +1,126 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import ( + "testing" + + "github.com/docker/go-units" + "github.com/stretchr/testify/require" +) + +func TestKVMemBufInterweaveAllocAndRecycle(t *testing.T) { + type testCase struct { + AllocSizes []int + FinalAvailableByteBufCaps []int + } + for _, tc := range []testCase{ + { + AllocSizes: []int{ + 1 * units.MiB, + 2 * units.MiB, + 3 * units.MiB, + 4 * units.MiB, + 5 * units.MiB, + }, + // [2] => [2,4] => [2,4,8] => [4,2,8] => [4,2,8,16] + FinalAvailableByteBufCaps: []int{ + 4 * units.MiB, + 2 * units.MiB, + 8 * units.MiB, + 16 * units.MiB, + }, + }, + { + AllocSizes: []int{ + 5 * units.MiB, + 4 * units.MiB, + 3 * units.MiB, + 2 * units.MiB, + 1 * units.MiB, + }, + // [16] => [16] => [16] => [16] => [16] + FinalAvailableByteBufCaps: []int{16 * units.MiB}, + }, + { + AllocSizes: []int{5, 4, 3, 2, 1}, + // [1] => [1] => [1] => [1] => [1] + FinalAvailableByteBufCaps: []int{1 * units.MiB}, + }, + { + AllocSizes: []int{ + 1 * units.MiB, + 2 * units.MiB, + 3 * units.MiB, + 2 * units.MiB, + 1 * units.MiB, + 5 * units.MiB, + }, + // [2] => [2,4] => [2,4,8] => [2,8,4] => [8,4,2] => [8,4,2,16] + FinalAvailableByteBufCaps: []int{ + 8 * units.MiB, + 4 * units.MiB, + 2 * units.MiB, + 16 * units.MiB, + }, + }, + } { + testKVMemBuf := &kvMemBuf{} + for _, allocSize := range tc.AllocSizes { + testKVMemBuf.AllocateBuf(allocSize) + testKVMemBuf.Recycle(testKVMemBuf.buf) + } + require.Equal(t, len(tc.FinalAvailableByteBufCaps), len(testKVMemBuf.availableBufs)) + for i, bb := range testKVMemBuf.availableBufs { + require.Equal(t, tc.FinalAvailableByteBufCaps[i], bb.cap) + } + } +} + +func TestKVMemBufBatchAllocAndRecycle(t *testing.T) { + type testCase struct { + AllocSizes []int + FinalAvailableByteBufCaps []int + } + testKVMemBuf := &kvMemBuf{} + bBufs := []*bytesBuf{} + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(1 * units.MiB) + bBufs = append(bBufs, testKVMemBuf.buf) + } + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(2 * units.MiB) + bBufs = append(bBufs, testKVMemBuf.buf) + } + for _, bb := range bBufs { + testKVMemBuf.Recycle(bb) + } + require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs)) + for _, bb := range testKVMemBuf.availableBufs { + require.Equal(t, 4*units.MiB, bb.cap) + } + bBufs = bBufs[:0] + for i := 0; i < maxAvailableBufSize; i++ { + testKVMemBuf.AllocateBuf(1 * units.MiB) + bb := testKVMemBuf.buf + require.Equal(t, 4*units.MiB, bb.cap) + bBufs = append(bBufs, bb) + require.Equal(t, maxAvailableBufSize-i-1, len(testKVMemBuf.availableBufs)) + } + for _, bb := range bBufs { + testKVMemBuf.Recycle(bb) + } + require.Equal(t, maxAvailableBufSize, len(testKVMemBuf.availableBufs)) +}