Skip to content

Commit

Permalink
statistics: process DML changes in the new PQ
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Sep 29, 2024
1 parent 828b461 commit c2d9ffe
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ go_test(
"static_partitioned_table_analysis_job_test.go",
],
flaky = True,
shard_count = 32,
shard_count = 33,
deps = [
":priorityqueue",
"//pkg/meta/model",
Expand Down
218 changes: 218 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/exec"
"github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/internal/heap"
statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil"
Expand All @@ -34,6 +37,7 @@ import (

const (
lastAnalysisDurationRefreshInterval = time.Minute * 10
dmlChangesFetchInterval = time.Minute * 2
)

// AnalysisPriorityQueueV2 is a priority queue for TableAnalysisJobs.
Expand All @@ -47,6 +51,9 @@ type AnalysisPriorityQueueV2 struct {
cancel context.CancelFunc
wg util.WaitGroupWrapper

// lastDMLUpdateFetchTimestamp is the timestamp of the last DML update fetch.
lastDMLUpdateFetchTimestamp atomic.Uint64

// initialized is a flag to check if the queue is initialized.
initialized atomic.Bool
}
Expand Down Expand Up @@ -112,6 +119,9 @@ func (pq *AnalysisPriorityQueueV2) init() error {
}
timeWindow := pq.autoAnalysisTimeWindow.Load().(*AutoAnalysisTimeWindow)

// We need to fetch the next check version with offset before fetching all tables and building analysis jobs.
// Otherwise, we may miss some DML changes happened during the process.
nextCheckVersionWithOffset := pq.statsHandle.GetNextCheckVersionWithOffset()
err = FetchAllTablesAndBuildAnalysisJobs(
sctx,
parameters,
Expand All @@ -122,6 +132,9 @@ func (pq *AnalysisPriorityQueueV2) init() error {
if err != nil {
return errors.Trace(err)
}

// Update the last fetch timestamp of DML updates.
pq.lastDMLUpdateFetchTimestamp.Store(nextCheckVersionWithOffset)
return nil
}, statsutil.FlagWrapTxn)
}
Expand All @@ -134,6 +147,8 @@ func (pq *AnalysisPriorityQueueV2) run() {
}
}()

dmlChangesFetchInterval := time.NewTicker(dmlChangesFetchInterval)
defer dmlChangesFetchInterval.Stop()
timeRefreshInterval := time.NewTicker(lastAnalysisDurationRefreshInterval)
defer timeRefreshInterval.Stop()

Expand All @@ -142,13 +157,216 @@ func (pq *AnalysisPriorityQueueV2) run() {
case <-pq.ctx.Done():
statslogutil.StatsLogger().Info("Priority queue stopped")
return
case <-dmlChangesFetchInterval.C:
statslogutil.StatsLogger().Info("Start to fetch DML changes of jobs")
pq.ProcessDMLChanges()
case <-timeRefreshInterval.C:
statslogutil.StatsLogger().Info("Start to refresh last analysis durations of jobs")
pq.RefreshLastAnalysisDuration()
}
}
}

// ProcessDMLChanges processes DML changes.
func (pq *AnalysisPriorityQueueV2) ProcessDMLChanges() {
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
parameters := exec.GetAutoAnalyzeParameters(sctx)
if err := pq.setAutoAnalysisTimeWindow(parameters); err != nil {
return errors.Trace(err)
}
if !pq.IsWithinTimeWindow() {
statslogutil.StatsLogger().Debug("Not within the auto analyze time window, skip processing DML changes")
return nil
}

start := time.Now()
defer func() {
statslogutil.StatsLogger().Info("DML changes processed", zap.Duration("duration", time.Since(start)))
}()

// We need to fetch the next check version with offset before fetching new DML changes.
// Otherwise, we may miss some DML changes happened during the process.
newMaxVersion := pq.statsHandle.GetNextCheckVersionWithOffset()
values := pq.statsHandle.Values()
lastFetchTimestamp := pq.lastDMLUpdateFetchTimestamp.Load()
for _, value := range values {
if value.Version > lastFetchTimestamp {
// TODO: consider locked tables and partitions.
err := pq.processTableStats(sctx, value, parameters)
if err != nil {
statslogutil.StatsLogger().Error(
"Failed to process table stats",
zap.Error(err),
zap.Int64("tableID", value.PhysicalID),
)
}
}
}

// Only update if we've seen a newer version
if newMaxVersion > lastFetchTimestamp {
statslogutil.StatsLogger().Info("Updating last fetch timestamp", zap.Uint64("new_max_version", newMaxVersion))
pq.lastDMLUpdateFetchTimestamp.Store(newMaxVersion)
}
return nil
}, statsutil.FlagWrapTxn); err != nil {
statslogutil.StatsLogger().Error("Failed to process DML changes", zap.Error(err))
}
}

func (pq *AnalysisPriorityQueueV2) processTableStats(
sctx sessionctx.Context,
stats *statistics.Table,
parameters map[string]string,
) error {
// Check if the table is eligible for analysis first to avoid unnecessary work.
if !stats.IsEligibleForAnalysis() {
return nil
}

autoAnalyzeRatio := exec.ParseAutoAnalyzeRatio(parameters[variable.TiDBAutoAnalyzeRatio])
// Get current timestamp from the session context.
currentTs, err := getStartTs(sctx)
if err != nil {
return nil
}
jobFactory := NewAnalysisJobFactory(sctx, autoAnalyzeRatio, currentTs)
// Check if the table is needed to be analyzed.
// Note: Unanalyzed tables will also be considered.
changePercent := jobFactory.CalculateChangePercentage(stats)
if changePercent == 0 {
return nil
}
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
pruneMode := variable.PartitionPruneMode(sctx.GetSessionVars().PartitionPruneMode.Load())

var job AnalysisJob
job, ok, _ := pq.inner.GetByKey(stats.PhysicalID)
if !ok {
job = pq.tryCreateJob(is, stats, pruneMode, jobFactory)
} else {
job = pq.tryUpdateJob(is, stats, job, jobFactory)
}

// Update the weight of the job.
job.SetWeight(pq.calculator.CalculateWeight(job))
return pq.inner.Add(job)
}

func (pq *AnalysisPriorityQueueV2) tryCreateJob(
is infoschema.InfoSchema,
stats *statistics.Table,
pruneMode variable.PartitionPruneMode,
jobFactory *AnalysisJobFactory,
) (job AnalysisJob) {
tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID)
tableMeta := tableInfo.Meta()
if !ok {
statslogutil.StatsLogger().Warn(
"Table info not found for table id",
zap.Int64("tableID", stats.PhysicalID),
)
return nil
}
schemaName, ok := is.SchemaNameByTableID(tableMeta.ID)
if !ok {
statslogutil.StatsLogger().Warn(
"Schema name not found for table id",
zap.Int64("tableID", stats.PhysicalID),
)
return nil
}
partitionedTable := tableMeta.GetPartitionInfo()
if partitionedTable == nil {
job = jobFactory.CreateNonPartitionedTableAnalysisJob(
schemaName.O,
tableMeta,
stats,
)
} else {
partitionDefs := partitionedTable.Definitions
if pruneMode == variable.Static {
var partitionDef model.PartitionDefinition
// Find the specific partition definition.
for _, def := range partitionDefs {
if def.ID == stats.PhysicalID {
partitionDef = def
break
}
}
job = jobFactory.CreateStaticPartitionAnalysisJob(
schemaName.O,
tableMeta,
partitionDef.ID,
partitionDef.Name.O,
stats,
)
} else {
partitionStats := GetPartitionStats(pq.statsHandle, tableMeta, partitionDefs)
job = jobFactory.CreateDynamicPartitionedTableAnalysisJob(
schemaName.O,
tableMeta,
stats,
partitionStats,
)
}
}
return job
}

func (pq *AnalysisPriorityQueueV2) tryUpdateJob(
is infoschema.InfoSchema,
stats *statistics.Table,
oldJob AnalysisJob,
jobFactory *AnalysisJobFactory,
) AnalysisJob {
indicators := oldJob.GetIndicators()

// For dynamic partitioned table, there is no way to only update the partition that has been changed.
// So we recreate the job for dynamic partitioned table.
if IsDynamicPartitionedTableAnalysisJob(oldJob) {
tableInfo, ok := pq.statsHandle.TableInfoByID(is, stats.PhysicalID)
tableMeta := tableInfo.Meta()
if !ok {
statslogutil.StatsLogger().Warn(
"Table info not found during updating job",
zap.Int64("tableID", stats.PhysicalID),
zap.String("job", oldJob.String()),
)
return nil
}
partitionedTable := tableMeta.GetPartitionInfo()
partitionDefs := partitionedTable.Definitions
partitionStats := GetPartitionStats(pq.statsHandle, tableMeta, partitionDefs)
schemaName, ok := is.SchemaNameByTableID(tableMeta.ID)
if !ok {
statslogutil.StatsLogger().Warn(
"Schema name not found during updating job",
zap.Int64("tableID", stats.PhysicalID),
zap.String("job", oldJob.String()),
)
return nil
}
return jobFactory.CreateDynamicPartitionedTableAnalysisJob(
schemaName.O,
tableMeta,
stats,
partitionStats,
)
}
// Otherwise, we update the indicators of the job.
indicators.ChangePercentage = jobFactory.CalculateChangePercentage(stats)
indicators.TableSize = jobFactory.CalculateTableSize(stats)
oldJob.SetIndicators(indicators)
return oldJob
}

// GetLastFetchTimestamp returns the last fetch timestamp of DML updates.
// Only used for testing.
func (pq *AnalysisPriorityQueueV2) GetLastFetchTimestamp() uint64 {
return pq.lastDMLUpdateFetchTimestamp.Load()
}

// RefreshLastAnalysisDuration refreshes the last analysis duration of all jobs in the priority queue.
func (pq *AnalysisPriorityQueueV2) RefreshLastAnalysisDuration() {
if err := statsutil.CallWithSCtx(pq.statsHandle.SPool(), func(sctx sessionctx.Context) error {
Expand Down
81 changes: 81 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,84 @@ func TestRefreshLastAnalysisDuration(t *testing.T) {
require.NotZero(t, updatedJob2.GetIndicators().LastAnalysisDuration)
require.NotEqual(t, oldLastAnalysisDuration2, updatedJob2.GetIndicators().LastAnalysisDuration)
}

func TestProcessDMLChanges(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
handle := dom.StatsHandle()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (a int)")
tk.MustExec("create table t2 (a int)")
tk.MustExec("insert into t1 values (1)")
tk.MustExec("insert into t2 values (1)")
statistics.AutoAnalyzeMinCnt = 0
defer func() {
statistics.AutoAnalyzeMinCnt = 1000
}()

ctx := context.Background()
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))
schema := pmodel.NewCIStr("test")
tbl1, err := dom.InfoSchema().TableByName(ctx, schema, pmodel.NewCIStr("t1"))
require.NoError(t, err)
tbl2, err := dom.InfoSchema().TableByName(ctx, schema, pmodel.NewCIStr("t2"))
require.NoError(t, err)

pq := priorityqueue.NewAnalysisPriorityQueueV2(handle)
defer pq.Close()
require.NoError(t, pq.Initialize())
lastFetchTime := pq.GetLastFetchTimestamp()
require.NotZero(t, lastFetchTime)

// Check current jobs.
job1, err := pq.Pop()
require.NoError(t, err)
require.Equal(t, tbl1.Meta().ID, job1.GetTableID())
job2, err := pq.Pop()
require.NoError(t, err)
require.Equal(t, tbl2.Meta().ID, job2.GetTableID())
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Insert 10 rows into t1.
tk.MustExec("insert into t1 values (2), (3), (4), (5), (6), (7), (8), (9), (10), (11)")
// Insert 2 rows into t2.
tk.MustExec("insert into t2 values (2), (3)")

// Dump the stats to kv.
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Process the DML changes.
pq.ProcessDMLChanges()
newLastFetchTime := pq.GetLastFetchTimestamp()
require.NotZero(t, newLastFetchTime)
require.NotEqual(t, lastFetchTime, newLastFetchTime)

// Check if the jobs have been updated.
updatedJob1, err := pq.Pop()
require.NoError(t, err)
require.NotZero(t, updatedJob1.GetWeight())
require.Equal(t, tbl1.Meta().ID, updatedJob1.GetTableID())
updatedJob2, err := pq.Pop()
require.NoError(t, err)
require.NotZero(t, updatedJob2.GetWeight())
require.Equal(t, tbl2.Meta().ID, updatedJob2.GetTableID())

// Insert more rows into t2.
tk.MustExec("insert into t2 values (4), (5), (6), (7), (8), (9), (10), (11), (12), (13)")
// Dump the stats to kv.
require.NoError(t, handle.DumpStatsDeltaToKV(true))
require.NoError(t, handle.Update(ctx, dom.InfoSchema()))

// Process the DML changes.
pq.ProcessDMLChanges()

// Check if the jobs have been updated.
updatedJob2, err = pq.Pop()
require.NoError(t, err)
require.NotZero(t, updatedJob2.GetWeight())
require.Equal(t, tbl2.Meta().ID, updatedJob2.GetTableID(), "t2 should have higher weight due to more DML changes")
}
5 changes: 3 additions & 2 deletions pkg/statistics/handle/cache/statscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewStatsCacheImplForTest() (types.StatsCache, error) {
// Update reads stats meta from store and updates the stats map.
func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) error {
start := time.Now()
lastVersion := s.getLastVersion()
lastVersion := s.GetNextCheckVersionWithOffset()
var (
rows []chunk.Row
err error
Expand Down Expand Up @@ -156,7 +156,8 @@ func (s *StatsCacheImpl) Update(ctx context.Context, is infoschema.InfoSchema) e
return nil
}

func (s *StatsCacheImpl) getLastVersion() uint64 {
// GetNextCheckVersionWithOffset gets the last version with offset.
func (s *StatsCacheImpl) GetNextCheckVersionWithOffset() uint64 {
// Get the greatest version of the stats meta table.
lastVersion := s.MaxTableStatsVersion()
// We need this because for two tables, the smaller version may write later than the one with larger version.
Expand Down
4 changes: 4 additions & 0 deletions pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ type StatsCache interface {
// UpdateStatsCache updates the cache.
UpdateStatsCache(addedTables []*statistics.Table, deletedTableIDs []int64)

// GetNextCheckVersionWithOffset returns the last version with offset.
// It is used to fetch updated statistics from the stats meta table.
GetNextCheckVersionWithOffset() uint64

// MaxTableStatsVersion returns the version of the current cache, which is defined as
// the max table stats version the cache has in its lifecycle.
MaxTableStatsVersion() uint64
Expand Down

0 comments on commit c2d9ffe

Please sign in to comment.