Skip to content

Commit

Permalink
statistics: upgrade stats timeout checkpoint after it timeouts (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei authored Sep 10, 2024
1 parent acc2a65 commit b8cfb3e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
1 change: 1 addition & 0 deletions statistics/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_test(
"//domain",
"//parser/model",
"//parser/mysql",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics",
Expand Down
9 changes: 5 additions & 4 deletions statistics/handle/handle_hist.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
// if the last task is not successfully handled in last round for error or panic, pass it to this round to retry
var lastTask *NeededItemTask
for {
task, err := h.HandleOneTask(lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
task, err := h.HandleOneTask(ctx, lastTask, readerCtx, ctx.(sqlexec.RestrictedSQLExecutor), exit)
lastTask = task
if err != nil {
switch err {
Expand All @@ -235,7 +235,7 @@ func (h *Handle) SubLoadWorker(ctx sessionctx.Context, exit chan struct{}, exitW
// - If the task is handled successfully, return nil, nil.
// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep.
// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep.
func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
func (h *Handle) HandleOneTask(sctx sessionctx.Context, lastTask *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor, exit chan struct{}) (task *NeededItemTask, err error) {
defer func() {
// recover for each task, worker keeps working
if r := recover(); r != nil {
Expand All @@ -244,7 +244,7 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
}
}()
if lastTask == nil {
task, err = h.drainColTask(exit)
task, err = h.drainColTask(sctx, exit)
if err != nil {
if err != errExit {
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
Expand Down Expand Up @@ -447,7 +447,7 @@ func (h *Handle) readStatsForOneItem(item model.TableItemID, w *statsWrapper, re
}

// drainColTask will hang until a column task can return, and either task or error will be returned.
func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
func (h *Handle) drainColTask(sctx sessionctx.Context, exit chan struct{}) (*NeededItemTask, error) {
// select NeededColumnsCh firstly, if no task, then select TimeoutColumnsCh
for {
select {
Expand All @@ -460,6 +460,7 @@ func (h *Handle) drainColTask(exit chan struct{}) (*NeededItemTask, error) {
// if the task has already timeout, no sql is sync-waiting for it,
// so do not handle it just now, put it to another channel with lower priority
if time.Now().After(task.ToTimeout) {
task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond)
h.writeToTimeoutChan(h.StatsLoad.TimeoutItemsCh, task)
continue
}
Expand Down
11 changes: 7 additions & 4 deletions statistics/handle/handle_hist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -205,7 +206,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
exitCh := make(chan struct{})
require.NoError(t, failpoint.Enable(fp.failPath, fp.inTerms))

task1, err1 := h.HandleOneTask(nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
Expand All @@ -226,7 +227,7 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
}

require.NoError(t, failpoint.Disable(fp.failPath))
task3, err3 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err3)
require.Nil(t, task3)

Expand Down Expand Up @@ -305,7 +306,8 @@ func TestRetry(t *testing.T) {
)
readerCtx := &handle.StatsReaderContext{}
for i := 0; i < handle.RetryCount; i++ {
task1, err1 = h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context),
task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.Error(t, err1)
require.NotNil(t, task1)
select {
Expand All @@ -315,7 +317,8 @@ func TestRetry(t *testing.T) {
default:
}
}
result, err1 := h.HandleOneTask(task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context),
task1, readerCtx, testKit.Session().(sqlexec.RestrictedSQLExecutor), exitCh)
require.NoError(t, err1)
require.Nil(t, result)
for _, resultCh := range stmtCtx1.StatsLoad.ResultCh {
Expand Down

0 comments on commit b8cfb3e

Please sign in to comment.