Skip to content

Commit

Permalink
ttl: fix some memory leak in TTL (pingcap#56935)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Oct 30, 2024
1 parent a7df4f9 commit 8a7e269
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ go_test(
"//pkg/ttl/metrics",
"//pkg/ttl/session",
"//pkg/types",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/logutil",
"//pkg/util/mock",
Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/del.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func (w *ttlDeleteWorker) loop() error {
if err != nil {
return err
}
defer se.Close()

ctx := metrics.CtxWithPhaseTracer(w.baseWorker.ctx, tracer)

Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func TestTTLDeleteTaskWorker(t *testing.T) {
s := newMockSession(t)
pool := newMockSessionPool(t)
pool.se = s
defer pool.AssertNoSessionInUse()

sqlMap := make(map[string]struct{})
s.executeSQL = func(ctx context.Context, sql string, args ...any) ([]chunk.Row, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,7 @@ func (a *managerJobAdapter) Now() (time.Time, error) {
if err != nil {
return time.Time{}, err
}
defer se.Close()

tz, err := se.GlobalTimeZone(context.TODO())
if err != nil {
Expand Down
39 changes: 36 additions & 3 deletions pkg/ttl/ttlworker/job_manager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/ttl/metrics"
"github.com/pingcap/tidb/pkg/ttl/session"
"github.com/pingcap/tidb/pkg/ttl/ttlworker"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1063,9 +1064,37 @@ func TestDelayMetrics(t *testing.T) {
checkRecord(records, "t5", emptyTime)
}

type poolTestWrapper struct {
util.SessionPool
inuse atomic.Int64
}

func wrapPoolForTest(pool util.SessionPool) *poolTestWrapper {
return &poolTestWrapper{SessionPool: pool}
}

func (w *poolTestWrapper) Get() (pools.Resource, error) {
r, err := w.SessionPool.Get()
if err == nil {
w.inuse.Add(1)
}
return r, err
}

func (w *poolTestWrapper) Put(r pools.Resource) {
w.inuse.Add(-1)
w.SessionPool.Put(r)
}

func (w *poolTestWrapper) AssertNoSessionInUse(t *testing.T) {
require.Zero(t, w.inuse.Load())
}

func TestManagerJobAdapterCanSubmitJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

// stop TTLJobManager to avoid unnecessary job schedule and make test stable
dom.TTLJobManager().Stop()
Expand Down Expand Up @@ -1194,7 +1223,9 @@ func TestManagerJobAdapterSubmitJob(t *testing.T) {

func TestManagerJobAdapterGetJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down Expand Up @@ -1289,7 +1320,9 @@ func TestManagerJobAdapterGetJob(t *testing.T) {

func TestManagerJobAdapterNow(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
adapter := ttlworker.NewManagerJobAdapter(store, dom.SysSessionPool(), nil)
pool := wrapPoolForTest(dom.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
adapter := ttlworker.NewManagerJobAdapter(store, pool, nil)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/ttlworker/job_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func TestOnTimerTick(t *testing.T) {

now := time.UnixMilli(3600 * 24)
syncer := NewTTLTimerSyncer(m.sessPool, timerapi.NewDefaultTimerClient(timerStore))
defer m.sessPool.(*mockSessionPool).AssertNoSessionInUse()
syncer.nowFunc = func() time.Time {
return now
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ttl/ttlworker/scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func TestScanWorkerSchedule(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.setOneRowResult(tbl, 7)
defer w.stopWithWait()

Expand Down Expand Up @@ -180,6 +181,7 @@ func TestScanWorkerScheduleWithFailedTask(t *testing.T) {

tbl := newMockTTLTbl(t, "t1")
w := NewMockScanWorker(t)
defer w.sessPoll.AssertNoSessionInUse()
w.clearInfoSchema()
defer w.stopWithWait()

Expand Down Expand Up @@ -392,6 +394,7 @@ func (t *mockScanTask) execSQL(_ context.Context, sql string, _ ...any) ([]chunk

func TestScanTaskDoScan(t *testing.T) {
task := newMockScanTask(t, 3)
defer task.sessPool.AssertNoSessionInUse()
task.ctx = cache.SetMockExpireTime(task.ctx, time.Now())
task.sqlRetry[1] = scanTaskExecuteSQLMaxRetry
task.runDoScanForTest(3, "")
Expand All @@ -413,6 +416,7 @@ func TestScanTaskDoScan(t *testing.T) {
func TestScanTaskCheck(t *testing.T) {
tbl := newMockTTLTbl(t, "t1")
pool := newMockSessionPool(t, tbl)
defer pool.AssertNoSessionInUse()
pool.se.rows = newMockRows(t, types.NewFieldType(mysql.TypeInt24)).Append(12).Rows()
ctx := cache.SetMockExpireTime(context.Background(), time.Unix(100, 0))

Expand Down
16 changes: 15 additions & 1 deletion pkg/ttl/ttlworker/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,28 @@ type mockSessionPool struct {
t *testing.T
se *mockSession
lastSession *mockSession
inuse atomic.Int64
}

func (p *mockSessionPool) Get() (pools.Resource, error) {
se := *(p.se)
p.lastSession = &se
p.lastSession.pool = p
p.inuse.Add(1)
return p.lastSession, nil
}

func (p *mockSessionPool) Put(pools.Resource) {}
func (p *mockSessionPool) Put(pools.Resource) {
p.inuse.Add(-1)
}

func (p *mockSessionPool) AssertNoSessionInUse() {
require.Equal(p.t, int64(0), p.inuse.Load())
}

func newMockSessionPool(t *testing.T, tbl ...*cache.PhysicalTable) *mockSessionPool {
return &mockSessionPool{
t: t,
se: newMockSession(t, tbl...),
}
}
Expand All @@ -152,6 +162,7 @@ type mockSession struct {
resetTimeZoneCalls int
closed bool
commitErr error
pool *mockSessionPool
}

func newMockSession(t *testing.T, tbl ...*cache.PhysicalTable) *mockSession {
Expand Down Expand Up @@ -223,6 +234,9 @@ func (s *mockSession) GlobalTimeZone(_ context.Context) (*time.Location, error)

func (s *mockSession) Close() {
s.closed = true
if s.pool != nil {
s.pool.Put(s)
}
}

func (s *mockSession) Now() time.Time {
Expand Down
8 changes: 6 additions & 2 deletions pkg/ttl/ttlworker/timer_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func TestTTLManualTriggerOneTimer(t *testing.T) {
defer timerStore.Close()
var zeroWatermark time.Time
cli := timerapi.NewDefaultTimerClient(timerStore)
sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli)
pool := wrapPoolForTest(do.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
sync := ttlworker.NewTTLTimerSyncer(pool, cli)

tk.MustExec("set @@global.tidb_ttl_job_enable=0")
tk.MustExec("create table tp1(a int, t timestamp) TTL=`t`+interval 1 HOUR ttl_job_interval='3h' partition by range(a) (" +
Expand Down Expand Up @@ -237,7 +239,9 @@ func TestTTLTimerSync(t *testing.T) {
insertTTLTableStatusWatermark(t, do, tk, "test", "tp1", "p1", wm2, true)

cli := timerapi.NewDefaultTimerClient(timerStore)
sync := ttlworker.NewTTLTimerSyncer(do.SysSessionPool(), cli)
pool := wrapPoolForTest(do.SysSessionPool())
defer pool.AssertNoSessionInUse(t)
sync := ttlworker.NewTTLTimerSyncer(pool, cli)

lastSyncTime, lastSyncVer := sync.GetLastSyncInfo()
require.True(t, lastSyncTime.IsZero())
Expand Down

0 comments on commit 8a7e269

Please sign in to comment.