Skip to content

Commit

Permalink
add ttl job manager framework
Browse files Browse the repository at this point in the history
Signed-off-by: YangKeao <[email protected]>
  • Loading branch information
YangKeao committed Dec 12, 2022
1 parent 4731204 commit 41326e0
Show file tree
Hide file tree
Showing 22 changed files with 1,448 additions and 38 deletions.
2 changes: 1 addition & 1 deletion ddl/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func onTTLInfoChange(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err er

if ttlInfo != nil {
// if the TTL_ENABLE is not set explicitly, use the original value
if ttlInfoEnable == nil {
if ttlInfoEnable == nil && tblInfo.TTLInfo != nil {
ttlInfo.Enable = tblInfo.TTLInfo.Enable
}
tblInfo.TTLInfo = ttlInfo
Expand Down
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//statistics",
"//statistics/handle",
"//telemetry",
"//ttl/ttlworker",
"//types",
"//util",
"//util/chunk",
Expand Down
29 changes: 29 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/telemetry"
"github.com/pingcap/tidb/ttl/ttlworker"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
Expand Down Expand Up @@ -121,6 +122,7 @@ type Domain struct {
expiredTimeStamp4PC types.Time
logBackupAdvancer *daemon.OwnerDaemon
historicalStatsWorker *HistoricalStatsWorker
ttlJobManager *ttlworker.JobManager

serverID uint64
serverIDSession *concurrency.Session
Expand Down Expand Up @@ -1059,6 +1061,10 @@ func (do *Domain) Init(
return err
}

do.wg.Run(func() {
do.runTTLJobManager(ctx)
})

return nil
}

Expand Down Expand Up @@ -2446,6 +2452,29 @@ func (do *Domain) serverIDKeeper() {
}
}

func (do *Domain) runTTLJobManager(ctx context.Context) {
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store)
ttlJobManager.Start()
do.ttlJobManager = ttlJobManager

// TODO: read the worker count from `do.sysVarCache` and resize the workers
ttlworker.ScanWorkersCount.Store(4)
ttlworker.DeleteWorkerCount.Store(4)

<-do.exit

ttlJobManager.Stop()
err := ttlJobManager.WaitStopped(ctx, 30*time.Second)
if err != nil {
logutil.BgLogger().Warn("fail to wait until the ttl job manager stop", zap.Error(err))
}
}

// TTLJobManager returns the ttl job manager on this domain
func (do *Domain) TTLJobManager() *ttlworker.JobManager {
return do.ttlJobManager
}

func init() {
initByLDFlagsForGlobalKill()
telemetry.GetDomainInfoSchema = func(ctx sessionctx.Context) infoschema.InfoSchema {
Expand Down
1 change: 0 additions & 1 deletion ttl/cache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/pingcap/tidb/ttl/cache",
visibility = ["//visibility:public"],
deps = [
"//infoschema",
"//kv",
"//parser/ast",
"//parser/model",
Expand Down
4 changes: 4 additions & 0 deletions ttl/cache/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ func (bc *baseCache) ShouldUpdate() bool {
func (bc *baseCache) SetInterval(interval time.Duration) {
bc.interval = interval
}

func (bc *baseCache) GetInterval() time.Duration {
return bc.interval
}
20 changes: 5 additions & 15 deletions ttl/cache/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ package cache
import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/ttl/session"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -41,26 +39,18 @@ func NewInfoSchemaCache(updateInterval time.Duration) *InfoSchemaCache {
}

// Update updates the info schema cache
func (isc *InfoSchemaCache) Update(sctx sessionctx.Context) error {
is, ok := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
if !ok {
return errors.New("fail to get domain info schema from session")
}

ext, ok := is.(*infoschema.SessionExtendedInfoSchema)
if !ok {
return errors.New("fail to get extended info schema")
}
func (isc *InfoSchemaCache) Update(se session.Session) error {
is := se.SessionInfoSchema()

if isc.schemaVer == ext.SchemaMetaVersion() {
if isc.schemaVer == is.SchemaMetaVersion() {
return nil
}

newTables := make(map[int64]*PhysicalTable, len(isc.Tables))
for _, db := range is.AllSchemas() {
for _, tbl := range is.SchemaTables(db.Name) {
tblInfo := tbl.Meta()
if tblInfo.TTLInfo == nil || tblInfo.State != model.StatePublic {
if tblInfo.TTLInfo == nil || !tblInfo.TTLInfo.Enable || tblInfo.State != model.StatePublic {
continue
}

Expand Down
8 changes: 5 additions & 3 deletions ttl/cache/infoschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/ttl/cache"
"github.com/pingcap/tidb/ttl/session"
"github.com/stretchr/testify/assert"
)

Expand All @@ -36,18 +37,19 @@ func TestInfoSchemaCache(t *testing.T) {
conn := server.CreateMockConn(t, sv)
sctx := conn.Context().Session
tk := testkit.NewTestKitWithSession(t, store, sctx)
se := session.NewSession(sctx, sctx, func() {})

isc := cache.NewInfoSchemaCache(time.Hour)

// test should update
assert.True(t, isc.ShouldUpdate())
assert.NoError(t, isc.Update(sctx))
assert.NoError(t, isc.Update(se))
assert.False(t, isc.ShouldUpdate())

// test new tables are synced
assert.Equal(t, 0, len(isc.Tables))
tk.MustExec("create table test.t(created_at datetime) ttl = created_at + INTERVAL 5 YEAR")
assert.NoError(t, isc.Update(sctx))
assert.NoError(t, isc.Update(se))
assert.Equal(t, 1, len(isc.Tables))
for _, table := range isc.Tables {
assert.Equal(t, "t", table.TableInfo.Name.L)
Expand All @@ -62,7 +64,7 @@ func TestInfoSchemaCache(t *testing.T) {
partition p1 values less than (2000)
)
`)
assert.NoError(t, isc.Update(sctx))
assert.NoError(t, isc.Update(se))
assert.Equal(t, 2, len(isc.Tables))
partitions := []string{}
for id, table := range isc.Tables {
Expand Down
2 changes: 1 addition & 1 deletion ttl/cache/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (t *PhysicalTable) ValidateKey(key []types.Datum) error {

// EvalExpireTime returns the expired time
func (t *PhysicalTable) EvalExpireTime(ctx context.Context, se session.Session, now time.Time) (expire time.Time, err error) {
tz := se.GetSessionVars().TimeZone
tz := se.GetSessionVars().Location()

expireExpr := t.TTLInfo.IntervalExprStr
unit := ast.TimeUnitType(t.TTLInfo.IntervalTimeUnit)
Expand Down
17 changes: 12 additions & 5 deletions ttl/cache/ttlstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package cache

import (
"context"
"fmt"
"time"

"github.com/pingcap/tidb/sessionctx"
Expand All @@ -35,12 +36,17 @@ const (
JobStatusCancelling = "cancelling"
// JobStatusCancelled means this job has been canceled successfully
JobStatusCancelled = "cancelled"
// JobStatusError means this job is in error status
JobStatusError = "error"
// JobStatusTimeout means this job has timeout
JobStatusTimeout = "timeout"
)

const selectFromTTLTableStatus = "SELECT table_id,parent_table_id,table_statistics,last_job_id,last_job_start_time,last_job_finish_time,last_job_ttl_expire,last_job_summary,current_job_id,current_job_owner_id,current_job_owner_addr,current_job_owner_hb_time,current_job_start_time,current_job_ttl_expire,current_job_state,current_job_status,current_job_status_update_time FROM mysql.tidb_ttl_table_status"

// SelectFromTTLTableStatusWithID returns an SQL statement to get the table status from table id
func SelectFromTTLTableStatusWithID(tableID int64) string {
return selectFromTTLTableStatus + fmt.Sprintf(" WHERE table_id = %d", tableID)
}

// TableStatus contains the corresponding information in the system table `mysql.tidb_ttl_table_status`
type TableStatus struct {
TableID int64
Expand Down Expand Up @@ -89,7 +95,7 @@ func (tsc *TableStatusCache) Update(ctx context.Context, se session.Session) err

newTables := make(map[int64]*TableStatus, len(rows))
for _, row := range rows {
status, err := rowToTableStatus(se, row)
status, err := RowToTableStatus(se, row)
if err != nil {
return err
}
Expand All @@ -101,9 +107,10 @@ func (tsc *TableStatusCache) Update(ctx context.Context, se session.Session) err
return nil
}

func rowToTableStatus(sctx sessionctx.Context, row chunk.Row) (*TableStatus, error) {
// RowToTableStatus converts a row to table status
func RowToTableStatus(sctx sessionctx.Context, row chunk.Row) (*TableStatus, error) {
var err error
timeZone := sctx.GetSessionVars().TimeZone
timeZone := sctx.GetSessionVars().Location()

status := &TableStatus{
TableID: row.GetInt64(0),
Expand Down
8 changes: 8 additions & 0 deletions ttl/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package session

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -41,6 +42,8 @@ type Session interface {
ResetWithGlobalTimeZone(ctx context.Context) error
// Close closes the session
Close()
// Now returns the current time in location specified by session var
Now() time.Time
}

type session struct {
Expand Down Expand Up @@ -145,3 +148,8 @@ func (s *session) Close() {
s.closeFn = nil
}
}

// Now returns the current time in the location of time_zone session var
func (s *session) Now() time.Time {
return time.Now().In(s.Context.GetSessionVars().Location())
}
11 changes: 11 additions & 0 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "ttlworker",
srcs = [
"config.go",
"del.go",
"job.go",
"job_manager.go",
"scan.go",
"session.go",
"worker.go",
],
importpath = "github.com/pingcap/tidb/ttl/ttlworker",
visibility = ["//visibility:public"],
deps = [
"//kv",
"//parser/terror",
"//sessionctx",
"//sessionctx/variable",
Expand All @@ -22,9 +26,12 @@ go_library(
"//util/chunk",
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
Expand All @@ -33,10 +40,13 @@ go_test(
name = "ttlworker_test",
srcs = [
"del_test.go",
"job_manager_test.go",
"job_test.go",
"scan_test.go",
"session_test.go",
],
embed = [":ttlworker"],
flaky = True,
deps = [
"//infoschema",
"//parser/ast",
Expand All @@ -49,6 +59,7 @@ go_test(
"//util/chunk",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_x_time//rate",
],
Expand Down
50 changes: 50 additions & 0 deletions ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ttlworker

import (
"time"

"go.uber.org/atomic"
)

// TODO: the following functions should be put in the variable pkg to avoid cyclic dependency after adding variables for the TTL
// some of them are only used in test

const jobManagerLoopTickerInterval = 10 * time.Second

const updateInfoSchemaCacheInterval = time.Minute
const updateTTLTableStatusCacheInterval = 10 * time.Minute

const ttlInternalSQLTimeout = 30 * time.Second
const ttlJobTimeout = 6 * time.Hour

// TODO: add this variable to the sysvar
const ttlJobInterval = time.Hour

// TODO: add these variables to the sysvar
var ttlJobScheduleWindowStartTime, _ = time.Parse(timeFormat, "2006-01-02 00:00:00")
var ttlJobScheduleWindowEndTime, _ = time.Parse(timeFormat, "2006-01-02 23:59:00")

// TODO: migrate these two count to sysvar

// ScanWorkersCount defines the count of scan worker
var ScanWorkersCount = atomic.NewUint64(0)

// DeleteWorkerCount defines the count of delete worker
var DeleteWorkerCount = atomic.NewUint64(0)

const resizeWorkersInterval = 30 * time.Second
const splitScanCount = 64
2 changes: 1 addition & 1 deletion ttl/ttlworker/del_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestTTLDeleteTaskDoDelete(t *testing.T) {

for _, c := range cases {
invokes = 0
retryRows := c.task.doDelete(context.TODO(), s)
retryRows := c.task.doDelete(context.Background(), s)
require.Equal(t, 4, invokes)
if c.retryRows == nil {
require.Nil(t, retryRows)
Expand Down
Loading

0 comments on commit 41326e0

Please sign in to comment.