From 45e837ddfb50bf303a38cdbfb9eb0399bd2d146e Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 9 Nov 2023 18:42:53 +0800 Subject: [PATCH] This is an automated cherry-pick of #48430 Signed-off-by: ti-chi-bot --- pkg/executor/historical_stats_test.go | 402 ++++++++++++++++++++++++ pkg/statistics/handle/storage/gc.go | 420 ++++++++++++++++++++++++++ 2 files changed, 822 insertions(+) create mode 100644 pkg/executor/historical_stats_test.go create mode 100644 pkg/statistics/handle/storage/gc.go diff --git a/pkg/executor/historical_stats_test.go b/pkg/executor/historical_stats_test.go new file mode 100644 index 0000000000000..65e54f20f5140 --- /dev/null +++ b/pkg/executor/historical_stats_test.go @@ -0,0 +1,402 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "encoding/json" + "fmt" + "strconv" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/storage" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" +) + +func TestRecordHistoryStatsAfterAnalyze(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats") + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 2") + tk.MustExec("set global tidb_enable_historical_stats = 0") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10))") + + h := dom.StatsHandle() + is := dom.InfoSchema() + tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + // 1. switch off the tidb_enable_historical_stats, and there is no records in table `mysql.stats_history` + rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows() + num, _ := strconv.Atoi(rows[0][0].(string)) + require.Equal(t, num, 0) + + tk.MustExec("analyze table t with 2 topn") + rows = tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows() + num, _ = strconv.Atoi(rows[0][0].(string)) + require.Equal(t, num, 0) + + // 2. switch on the tidb_enable_historical_stats and do analyze + tk.MustExec("set global tidb_enable_historical_stats = 1") + defer tk.MustExec("set global tidb_enable_historical_stats = 0") + tk.MustExec("analyze table t with 2 topn") + // dump historical stats + hsWorker := dom.GetHistoricalStatsWorker() + tblID := hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.Nil(t, err) + rows = tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows() + num, _ = strconv.Atoi(rows[0][0].(string)) + require.GreaterOrEqual(t, num, 1) + + // 3. dump current stats json + dumpJSONTable, err := h.DumpStatsToJSON("test", tableInfo.Meta(), nil, true) + require.NoError(t, err) + jsOrigin, _ := json.Marshal(dumpJSONTable) + + // 4. get the historical stats json + rows = tk.MustQuery(fmt.Sprintf("select * from mysql.stats_history where table_id = '%d' and create_time = ("+ + "select create_time from mysql.stats_history where table_id = '%d' order by create_time desc limit 1) "+ + "order by seq_no", tableInfo.Meta().ID, tableInfo.Meta().ID)).Rows() + num = len(rows) + require.GreaterOrEqual(t, num, 1) + data := make([][]byte, num) + for i, row := range rows { + data[i] = []byte(row[1].(string)) + } + jsonTbl, err := storage.BlocksToJSONTable(data) + require.NoError(t, err) + jsCur, err := json.Marshal(jsonTbl) + require.NoError(t, err) + // 5. historical stats must be equal to the current stats + require.JSONEq(t, string(jsOrigin), string(jsCur)) +} + +func TestRecordHistoryStatsMetaAfterAnalyze(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 2") + tk.MustExec("set global tidb_enable_historical_stats = 0") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("analyze table test.t") + + h := dom.StatsHandle() + is := dom.InfoSchema() + tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + + // 1. switch off the tidb_enable_historical_stats, and there is no record in table `mysql.stats_meta_history` + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) + // insert demo tuples, and there is no record either. + insertNums := 5 + for i := 0; i < insertNums; i++ { + tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)") + err := h.DumpStatsDeltaToKV(false) + require.NoError(t, err) + } + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Check(testkit.Rows("0")) + + // 2. switch on the tidb_enable_historical_stats and insert tuples to produce count/modifyCount delta change. + tk.MustExec("set global tidb_enable_historical_stats = 1") + defer tk.MustExec("set global tidb_enable_historical_stats = 0") + + for i := 0; i < insertNums; i++ { + tk.MustExec("insert into test.t (a,b) values (1,1), (2,2), (3,3)") + err := h.DumpStatsDeltaToKV(false) + require.NoError(t, err) + } + tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta_history where table_id = '%d' order by create_time", tableInfo.Meta().ID)).Sort().Check( + testkit.Rows("18 18", "21 21", "24 24", "27 27", "30 30")) + tk.MustQuery(fmt.Sprintf("select distinct source from mysql.stats_meta_history where table_id = '%d'", tableInfo.Meta().ID)).Sort().Check(testkit.Rows("flush stats")) + + // assert delete + tk.MustExec("delete from test.t where test.t.a = 1") + err = h.DumpStatsDeltaToKV(true) + require.NoError(t, err) + tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta where table_id = '%d' order by create_time desc", tableInfo.Meta().ID)).Sort().Check( + testkit.Rows("40 20")) + tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta_history where table_id = '%d' order by create_time desc limit 1", tableInfo.Meta().ID)).Sort().Check( + testkit.Rows("40 20")) + + // assert update + tk.MustExec("update test.t set test.t.b = 4 where test.t.a = 2") + err = h.DumpStatsDeltaToKV(true) + require.NoError(t, err) + tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta where table_id = '%d' order by create_time desc", tableInfo.Meta().ID)).Sort().Check( + testkit.Rows("50 20")) + tk.MustQuery(fmt.Sprintf("select modify_count, count from mysql.stats_meta_history where table_id = '%d' order by create_time desc limit 1", tableInfo.Meta().ID)).Sort().Check( + testkit.Rows("50 20")) +} + +func TestGCHistoryStatsAfterDropTable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats") + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10))") + tk.MustExec("analyze table test.t") + is := dom.InfoSchema() + tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + tblID := hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.Nil(t, err) + + // assert the records of history stats table + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d' order by create_time", + tableInfo.Meta().ID)).Check(testkit.Rows("1")) + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", + tableInfo.Meta().ID)).Check(testkit.Rows("1")) + // drop the table and gc stats + tk.MustExec("drop table t") + is = dom.InfoSchema() + h.GCStats(is, 0) + + // assert stats_history tables delete the record of dropped table + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d' order by create_time", + tableInfo.Meta().ID)).Check(testkit.Rows("0")) + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", + tableInfo.Meta().ID)).Check(testkit.Rows("0")) +} + +func TestAssertHistoricalStatsAfterAlterTable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats") + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10),c int, KEY `idx` (`c`))") + tk.MustExec("analyze table test.t") + is := dom.InfoSchema() + tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + tblID := hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.Nil(t, err) + + time.Sleep(1 * time.Second) + snapshot := oracle.GoTimeToTS(time.Now()) + jsTable, _, err := h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + require.NotEqual(t, jsTable.Version, uint64(0)) + originVersion := jsTable.Version + + // assert historical stats non-change after drop column + tk.MustExec("alter table t drop column b") + h.GCStats(is, 0) + snapshot = oracle.GoTimeToTS(time.Now()) + jsTable, _, err = h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + require.Equal(t, jsTable.Version, originVersion) + + // assert historical stats non-change after drop index + tk.MustExec("alter table t drop index idx") + h.GCStats(is, 0) + snapshot = oracle.GoTimeToTS(time.Now()) + jsTable, _, err = h.DumpHistoricalStatsBySnapshot("test", tableInfo.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + require.Equal(t, jsTable.Version, originVersion) +} + +func TestGCOutdatedHistoryStats(t *testing.T) { + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats", "return(true)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats")) + }() + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10))") + tk.MustExec("analyze table test.t") + is := dom.InfoSchema() + tableInfo, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + tblID := hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.Nil(t, err) + + // assert the records of history stats table + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d' order by create_time", + tableInfo.Meta().ID)).Check(testkit.Rows("1")) + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", + tableInfo.Meta().ID)).Check(testkit.Rows("1")) + + tk.MustExec("set @@global.tidb_historical_stats_duration = '1s'") + duration := variable.HistoricalStatsDuration.Load() + fmt.Println(duration.String()) + time.Sleep(2 * time.Second) + err = dom.StatsHandle().ClearOutdatedHistoryStats() + require.NoError(t, err) + // assert the records of history stats table + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_meta_history where table_id = '%d' order by create_time", + tableInfo.Meta().ID)).Check(testkit.Rows("0")) + tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", + tableInfo.Meta().ID)).Check(testkit.Rows("0")) +} + +func TestPartitionTableHistoricalStats(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats") + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b)) +PARTITION BY RANGE ( a ) ( +PARTITION p0 VALUES LESS THAN (6) +)`) + tk.MustExec("delete from mysql.stats_history") + + tk.MustExec("analyze table test.t") + // dump historical stats + h := dom.StatsHandle() + hsWorker := dom.GetHistoricalStatsWorker() + + // assert global table and partition table be dumped + tblID := hsWorker.GetOneHistoricalStatsTable() + err := hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tblID = hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tk.MustQuery("select count(*) from mysql.stats_history").Check(testkit.Rows("2")) +} + +func TestDumpHistoricalStatsByTable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats", "return(true)") + defer failpoint.Disable("github.com/pingcap/tidb/pkg/domain/sendHistoricalStats") + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 1") + tk.MustExec("set @@tidb_partition_prune_mode='static'") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b)) +PARTITION BY RANGE ( a ) ( +PARTITION p0 VALUES LESS THAN (6) +)`) + // dump historical stats + h := dom.StatsHandle() + + tk.MustExec("analyze table t") + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + require.NotNil(t, tbl) + + // dump historical stats + hsWorker := dom.GetHistoricalStatsWorker() + // only partition p0 stats will be dumped in static mode + tblID := hsWorker.GetOneHistoricalStatsTable() + require.NotEqual(t, tblID, -1) + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tblID = hsWorker.GetOneHistoricalStatsTable() + require.Equal(t, tblID, int64(-1)) + + time.Sleep(1 * time.Second) + snapshot := oracle.GoTimeToTS(time.Now()) + jsTable, _, err := h.DumpHistoricalStatsBySnapshot("test", tbl.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + // only has p0 stats + require.NotNil(t, jsTable.Partitions["p0"]) + require.Nil(t, jsTable.Partitions[util.TiDBGlobalStats]) + + // change static to dynamic then assert + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("analyze table t") + require.NoError(t, err) + // global and p0's stats will be dumped + tblID = hsWorker.GetOneHistoricalStatsTable() + require.NotEqual(t, tblID, -1) + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + tblID = hsWorker.GetOneHistoricalStatsTable() + require.NotEqual(t, tblID, -1) + err = hsWorker.DumpHistoricalStats(tblID, h) + require.NoError(t, err) + time.Sleep(1 * time.Second) + snapshot = oracle.GoTimeToTS(time.Now()) + jsTable, _, err = h.DumpHistoricalStatsBySnapshot("test", tbl.Meta(), snapshot) + require.NoError(t, err) + require.NotNil(t, jsTable) + // has both global and p0 stats + require.NotNil(t, jsTable.Partitions["p0"]) + require.NotNil(t, jsTable.Partitions[util.TiDBGlobalStats]) +} + +func TestDumpHistoricalStatsFallback(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set global tidb_enable_historical_stats = 0") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec(`CREATE TABLE t (a int, b int, index idx(b)) +PARTITION BY RANGE ( a ) ( +PARTITION p0 VALUES LESS THAN (6) +)`) + // dump historical stats + tk.MustExec("analyze table t") + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + require.NotNil(t, tbl) + + // dump historical stats + hsWorker := dom.GetHistoricalStatsWorker() + tblID := hsWorker.GetOneHistoricalStatsTable() + // assert no historical stats task generated + require.Equal(t, tblID, int64(-1)) + tk.MustExec("set global tidb_enable_historical_stats = 1") + h := dom.StatsHandle() + jt, _, err := h.DumpHistoricalStatsBySnapshot("test", tbl.Meta(), oracle.GoTimeToTS(time.Now())) + require.NoError(t, err) + require.NotNil(t, jt) + require.False(t, jt.IsHistoricalStats) +} diff --git a/pkg/statistics/handle/storage/gc.go b/pkg/statistics/handle/storage/gc.go new file mode 100644 index 0000000000000..155830c59af34 --- /dev/null +++ b/pkg/statistics/handle/storage/gc.go @@ -0,0 +1,420 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/cache" + "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" +) + +// statsGCImpl implements StatsGC interface. +type statsGCImpl struct { + statsHandle util.StatsHandle +} + +// NewStatsGC creates a new StatsGC. +func NewStatsGC(statsHandle util.StatsHandle) util.StatsGC { + return &statsGCImpl{ + statsHandle: statsHandle, + } +} + +// GCStats will garbage collect the useless stats' info. +// For dropped tables, we will first update their version +// so that other tidb could know that table is deleted. +func (gc *statsGCImpl) GCStats(is infoschema.InfoSchema, ddlLease time.Duration) (err error) { + return util.CallWithSCtx(gc.statsHandle.SPool(), func(sctx sessionctx.Context) error { + return GCStats(sctx, gc.statsHandle, is, ddlLease) + }) +} + +// ClearOutdatedHistoryStats clear outdated historical stats. +// Only for test. +func (gc *statsGCImpl) ClearOutdatedHistoryStats() error { + return util.CallWithSCtx(gc.statsHandle.SPool(), ClearOutdatedHistoryStats) +} + +// DeleteTableStatsFromKV deletes table statistics from kv. +// A statsID refers to statistic of a table or a partition. +func (gc *statsGCImpl) DeleteTableStatsFromKV(statsIDs []int64) (err error) { + return util.CallWithSCtx(gc.statsHandle.SPool(), func(sctx sessionctx.Context) error { + return DeleteTableStatsFromKV(sctx, statsIDs) + }, util.FlagWrapTxn) +} + +// GCStats will garbage collect the useless stats' info. +// For dropped tables, we will first update their version +// so that other tidb could know that table is deleted. +func GCStats(sctx sessionctx.Context, + statsHandle util.StatsHandle, + is infoschema.InfoSchema, ddlLease time.Duration) (err error) { + // To make sure that all the deleted tables' schema and stats info have been acknowledged to all tidb, + // we only garbage collect version before 10 lease. + lease := max(statsHandle.Lease(), ddlLease) + offset := util.DurationToTS(10 * lease) + now := oracle.GoTimeToTS(time.Now()) + if now < offset { + return nil + } + + // Get the last gc time. + gcVer := now - offset + lastGC, err := getLastGCTimestamp(sctx) + if err != nil { + return err + } + defer func() { + if err != nil { + return + } + err = writeGCTimestampToKV(sctx, gcVer) + }() + + rows, _, err := util.ExecRows(sctx, "select table_id from mysql.stats_meta where version >= %? and version < %?", lastGC, gcVer) + if err != nil { + return errors.Trace(err) + } + for _, row := range rows { + if err := gcTableStats(sctx, statsHandle, is, row.GetInt64(0)); err != nil { + return errors.Trace(err) + } + _, existed := is.TableByID(row.GetInt64(0)) + if !existed { + if err := gcHistoryStatsFromKV(sctx, row.GetInt64(0)); err != nil { + return errors.Trace(err) + } + } + } + + if err := ClearOutdatedHistoryStats(sctx); err != nil { + logutil.BgLogger().Warn("failed to gc outdated historical stats", + zap.Duration("duration", variable.HistoricalStatsDuration.Load()), + zap.Error(err)) + } + + return removeDeletedExtendedStats(sctx, gcVer) +} + +// DeleteTableStatsFromKV deletes table statistics from kv. +// A statsID refers to statistic of a table or a partition. +func DeleteTableStatsFromKV(sctx sessionctx.Context, statsIDs []int64) (err error) { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + for _, statsID := range statsIDs { + // We only update the version so that other tidb will know that this table is deleted. + if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, statsID); err != nil { + return err + } + if _, err = util.Exec(sctx, "delete from mysql.stats_histograms where table_id = %?", statsID); err != nil { + return err + } + if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %?", statsID); err != nil { + return err + } + if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %?", statsID); err != nil { + return err + } + if _, err = util.Exec(sctx, "update mysql.stats_extended set version = %?, status = %? where table_id = %? and status in (%?, %?)", startTS, statistics.ExtendedStatsDeleted, statsID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited); err != nil { + return err + } + if _, err = util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %?", statsID); err != nil { + return err + } + if _, err = util.Exec(sctx, "delete from mysql.column_stats_usage where table_id = %?", statsID); err != nil { + return err + } + if _, err = util.Exec(sctx, "delete from mysql.analyze_options where table_id = %?", statsID); err != nil { + return err + } + if _, err = util.Exec(sctx, lockstats.DeleteLockSQL, statsID); err != nil { + return err + } + } + return nil +} + +func forCount(total int64, batch int64) int64 { + result := total / batch + if total%batch > 0 { + result++ + } + return result +} + +// ClearOutdatedHistoryStats clear outdated historical stats +func ClearOutdatedHistoryStats(sctx sessionctx.Context) error { + sql := "select count(*) from mysql.stats_meta_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND" + rs, err := util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + if err != nil { + return err + } + if rs == nil { + return nil + } + var rows []chunk.Row + defer terror.Call(rs.Close) + if rows, err = sqlexec.DrainRecordSet(context.Background(), rs, 8); err != nil { + return errors.Trace(err) + } + count := rows[0].GetInt64(0) + if count > 0 { + for n := int64(0); n < forCount(count, int64(1000)); n++ { + sql = "delete from mysql.stats_meta_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND limit 1000 " + _, err = util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + if err != nil { + return err + } + } + for n := int64(0); n < forCount(count, int64(50)); n++ { + sql = "delete from mysql.stats_history use index (idx_create_time) where create_time <= NOW() - INTERVAL %? SECOND limit 50 " + _, err = util.Exec(sctx, sql, variable.HistoricalStatsDuration.Load().Seconds()) + return err + } + logutil.BgLogger().Info("clear outdated historical stats") + } + return nil +} + +// gcHistoryStatsFromKV delete history stats from kv. +func gcHistoryStatsFromKV(sctx sessionctx.Context, physicalID int64) (err error) { + sql := "delete from mysql.stats_history where table_id = %?" + _, err = util.Exec(sctx, sql, physicalID) + if err != nil { + return errors.Trace(err) + } + sql = "delete from mysql.stats_meta_history where table_id = %?" + _, err = util.Exec(sctx, sql, physicalID) + return err +} + +// deleteHistStatsFromKV deletes all records about a column or an index and updates version. +func deleteHistStatsFromKV(sctx sessionctx.Context, physicalID int64, histID int64, isIndex int) (err error) { + startTS, err := util.GetStartTS(sctx) + if err != nil { + return errors.Trace(err) + } + // First of all, we update the version. If this table doesn't exist, it won't have any problem. Because we cannot delete anything. + if _, err = util.Exec(sctx, "update mysql.stats_meta set version = %? where table_id = %? ", startTS, physicalID); err != nil { + return err + } + // delete histogram meta + if _, err = util.Exec(sctx, "delete from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } + // delete top n data + if _, err = util.Exec(sctx, "delete from mysql.stats_top_n where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } + // delete all buckets + if _, err = util.Exec(sctx, "delete from mysql.stats_buckets where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } + // delete all fm sketch + if _, err := util.Exec(sctx, "delete from mysql.stats_fm_sketch where table_id = %? and hist_id = %? and is_index = %?", physicalID, histID, isIndex); err != nil { + return err + } + if isIndex == 0 { + // delete the record in mysql.column_stats_usage + if _, err = util.Exec(sctx, "delete from mysql.column_stats_usage where table_id = %? and column_id = %?", physicalID, histID); err != nil { + return err + } + } + return nil +} + +// removeDeletedExtendedStats removes deleted extended stats. +func removeDeletedExtendedStats(sctx sessionctx.Context, version uint64) (err error) { + const sql = "delete from mysql.stats_extended where status = %? and version < %?" + _, err = util.Exec(sctx, sql, statistics.ExtendedStatsDeleted, version) + return +} + +// gcTableStats GC this table's stats. +func gcTableStats(sctx sessionctx.Context, + statsHandler util.StatsHandle, + is infoschema.InfoSchema, physicalID int64) error { + rows, _, err := util.ExecRows(sctx, "select is_index, hist_id from mysql.stats_histograms where table_id = %?", physicalID) + if err != nil { + return errors.Trace(err) + } + // The table has already been deleted in stats and acknowledged to all tidb, + // we can safely remove the meta info now. + if len(rows) == 0 { + _, _, err = util.ExecRows(sctx, "delete from mysql.stats_meta where table_id = %?", physicalID) + if err != nil { + return errors.Trace(err) + } + cache.TableRowStatsCache.Invalidate(physicalID) + } + tbl, ok := statsHandler.TableInfoByID(is, physicalID) + if !ok { + logutil.BgLogger().Info("remove stats in GC due to dropped table", zap.Int64("table_id", physicalID)) + return util.WrapTxn(sctx, func(sctx sessionctx.Context) error { + return errors.Trace(DeleteTableStatsFromKV(sctx, []int64{physicalID})) + }) + } + tblInfo := tbl.Meta() + for _, row := range rows { + isIndex, histID := row.GetInt64(0), row.GetInt64(1) + find := false + if isIndex == 1 { + for _, idx := range tblInfo.Indices { + if idx.ID == histID { + find = true + break + } + } + } else { + for _, col := range tblInfo.Columns { + if col.ID == histID { + find = true + break + } + } + } + if !find { + err := util.WrapTxn(sctx, func(sctx sessionctx.Context) error { + return errors.Trace(deleteHistStatsFromKV(sctx, physicalID, histID, int(isIndex))) + }) + if err != nil { + return errors.Trace(err) + } + } + } + + // Mark records in mysql.stats_extended as `deleted`. + rows, _, err = util.ExecRows(sctx, "select name, column_ids from mysql.stats_extended where table_id = %? and status in (%?, %?)", physicalID, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsInited) + if err != nil { + return errors.Trace(err) + } + if len(rows) == 0 { + return nil + } + for _, row := range rows { + statsName, strColIDs := row.GetString(0), row.GetString(1) + var colIDs []int64 + err = json.Unmarshal([]byte(strColIDs), &colIDs) + if err != nil { + logutil.BgLogger().Debug("decode column IDs failed", zap.String("column_ids", strColIDs), zap.Error(err)) + return errors.Trace(err) + } + for _, colID := range colIDs { + found := false + for _, col := range tblInfo.Columns { + if colID == col.ID { + found = true + break + } + } + if !found { + logutil.BgLogger().Info("mark mysql.stats_extended record as 'deleted' in GC due to dropped columns", zap.String("table_name", tblInfo.Name.L), zap.Int64("table_id", physicalID), zap.String("stats_name", statsName), zap.Int64("dropped_column_id", colID)) + err = statsHandler.MarkExtendedStatsDeleted(statsName, physicalID, true) + if err != nil { + logutil.BgLogger().Debug("update stats_extended status failed", zap.String("stats_name", statsName), zap.Error(err)) + return errors.Trace(err) + } + break + } + } + } + return nil +} + +const gcLastTSVarName = "tidb_stats_gc_last_ts" + +// getLastGCTimestamp loads the last gc time from mysql.tidb. +func getLastGCTimestamp(sctx sessionctx.Context) (uint64, error) { + rows, _, err := util.ExecRows(sctx, "SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name=%?", gcLastTSVarName) + if err != nil { + return 0, errors.Trace(err) + } + if len(rows) == 0 { + return 0, nil + } + lastGcTSString := rows[0].GetString(0) + lastGcTS, err := strconv.ParseUint(lastGcTSString, 10, 64) + if err != nil { + return 0, errors.Trace(err) + } + return lastGcTS, nil +} + +// writeGCTimestampToKV write the GC timestamp to the storage. +func writeGCTimestampToKV(sctx sessionctx.Context, newTS uint64) error { + _, _, err := util.ExecRows(sctx, + "insert into mysql.tidb (variable_name, variable_value) values (%?, %?) on duplicate key update variable_value = %?", + gcLastTSVarName, + newTS, + newTS, + ) + return err +} + +// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. +func MarkExtendedStatsDeleted(sctx sessionctx.Context, + statsCache util.StatsCache, + statsName string, tableID int64, ifExists bool) (statsVer uint64, err error) { + rows, _, err := util.ExecRows(sctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed) + if err != nil { + return 0, errors.Trace(err) + } + if len(rows) == 0 { + if ifExists { + return 0, nil + } + return 0, fmt.Errorf("extended statistics '%s' for the specified table does not exist", statsName) + } + if len(rows) > 1 { + logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID)) + } + + defer func() { + if err == nil { + removeExtendedStatsItem(statsCache, tableID, statsName) + } + }() + version, err := util.GetStartTS(sctx) + if err != nil { + return 0, errors.Trace(err) + } + if _, err = util.Exec(sctx, "UPDATE mysql.stats_meta SET version = %? WHERE table_id = %?", version, tableID); err != nil { + return 0, err + } + statsVer = version + if _, err = util.Exec(sctx, "UPDATE mysql.stats_extended SET version = %?, status = %? WHERE name = %? and table_id = %?", version, statistics.ExtendedStatsDeleted, statsName, tableID); err != nil { + return 0, err + } + return +}