Skip to content

Commit

Permalink
session, statistics, util: fix data race of Handle.mu.ctx (#33732)
Browse files Browse the repository at this point in the history
ref #32822, close #32867, ref #33001
  • Loading branch information
xuyifangreeneyes authored Apr 13, 2022
1 parent 192482d commit 9c836a5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 24 deletions.
12 changes: 9 additions & 3 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,10 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu
if execOption.AnalyzeVer != 0 {
s.sessionVars.AnalyzeVersion = execOption.AnalyzeVer
}
prePruneMode := s.sessionVars.PartitionPruneMode.Load()
if len(execOption.PartitionPruneMode) > 0 {
s.sessionVars.PartitionPruneMode.Store(execOption.PartitionPruneMode)
}
prevSQL := s.sessionVars.StmtCtx.OriginalSQL
prevStmtType := s.sessionVars.StmtCtx.StmtType
prevTables := s.sessionVars.StmtCtx.Tables
Expand All @@ -1679,6 +1683,7 @@ func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, fu
logutil.BgLogger().Error("set tidbSnapshot error", zap.Error(err))
}
s.sessionVars.SnapshotInfoschema = nil
s.sessionVars.PartitionPruneMode.Store(prePruneMode)
s.sessionVars.StmtCtx.OriginalSQL = prevSQL
s.sessionVars.StmtCtx.StmtType = prevStmtType
s.sessionVars.StmtCtx.Tables = prevTables
Expand All @@ -1700,7 +1705,6 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f
if ok := s.sessionVars.OptimizerUseInvisibleIndexes; ok {
se.sessionVars.OptimizerUseInvisibleIndexes = true
}
prePruneMode := se.sessionVars.PartitionPruneMode.Load()

if execOption.SnapshotTS != 0 {
se.sessionVars.SnapshotInfoschema, err = getSnapshotInfoSchema(s, execOption.SnapshotTS)
Expand All @@ -1717,8 +1721,10 @@ func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, f
se.sessionVars.AnalyzeVersion = execOption.AnalyzeVer
}

// for analyze stmt we need let worker session follow user session that executing stmt.
se.sessionVars.PartitionPruneMode.Store(s.sessionVars.PartitionPruneMode.Load())
prePruneMode := se.sessionVars.PartitionPruneMode.Load()
if len(execOption.PartitionPruneMode) > 0 {
se.sessionVars.PartitionPruneMode.Store(execOption.PartitionPruneMode)
}

// Put the internal session to the map of SessionManager
infosync.StoreInternalSession(se)
Expand Down
43 changes: 30 additions & 13 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ type Handle struct {
memTracker *memory.Tracker
}

// Deprecated: only used by feedback now
pool sessionPool

// ddlEventCh is a channel to notify a ddl operation has happened.
Expand Down Expand Up @@ -129,25 +128,43 @@ type Handle struct {
sysProcTracker sessionctx.SysProcTracker
}

func (h *Handle) withRestrictedSQLExecutor(ctx context.Context, fn func(context.Context, sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error)) ([]chunk.Row, []*ast.ResultField, error) {
se, err := h.pool.Get()
if err != nil {
return nil, nil, errors.Trace(err)
}
defer h.pool.Put(se)

exec := se.(sqlexec.RestrictedSQLExecutor)
return fn(ctx, exec)
}

func (h *Handle) execRestrictedSQL(ctx context.Context, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
return h.mu.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseSessionPool}, sql, params...)
return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) {
return exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, sql, params...)
})
}

func (h *Handle) execRestrictedSQLWithStatsVer(ctx context.Context, statsVer int, procTrackID uint64, sql string, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
sqlexec.ExecOptionUseSessionPool,
execOptionForAnalyze[statsVer],
sqlexec.ExecOptionWithSysProcTrack(procTrackID, h.sysProcTracker.Track, h.sysProcTracker.UnTrack),
}
return h.mu.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, optFuncs, sql, params...)
return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
execOptionForAnalyze[statsVer],
sqlexec.GetPartitionPruneModeOption(string(h.CurrentPruneMode())),
sqlexec.ExecOptionUseCurSession,
sqlexec.ExecOptionWithSysProcTrack(procTrackID, h.sysProcTracker.Track, h.sysProcTracker.UnTrack),
}
return exec.ExecRestrictedSQL(ctx, optFuncs, sql, params...)
})
}

func (h *Handle) execRestrictedSQLWithSnapshot(ctx context.Context, sql string, snapshot uint64, params ...interface{}) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
sqlexec.ExecOptionUseSessionPool,
sqlexec.ExecOptionWithSnapshot(snapshot),
}
return h.mu.ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, optFuncs, sql, params...)
return h.withRestrictedSQLExecutor(ctx, func(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) ([]chunk.Row, []*ast.ResultField, error) {
optFuncs := []sqlexec.OptionFuncAlias{
sqlexec.ExecOptionWithSnapshot(snapshot),
sqlexec.ExecOptionUseCurSession,
}
return exec.ExecRestrictedSQL(ctx, optFuncs, sql, params...)
})
}

// Clear the statsCache, only for test.
Expand Down
23 changes: 15 additions & 8 deletions util/sqlexec/restricted_sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,14 @@ type RestrictedSQLExecutor interface {

// ExecOption is a struct defined for ExecRestrictedStmt/SQL option.
type ExecOption struct {
IgnoreWarning bool
SnapshotTS uint64
AnalyzeVer int
UseCurSession bool
TrackSysProcID uint64
TrackSysProc func(id uint64, ctx sessionctx.Context) error
UnTrackSysProc func(id uint64)
IgnoreWarning bool
SnapshotTS uint64
AnalyzeVer int
PartitionPruneMode string
UseCurSession bool
TrackSysProcID uint64
TrackSysProc func(id uint64, ctx sessionctx.Context) error
UnTrackSysProc func(id uint64)
}

// OptionFuncAlias is defined for the optional parameter of ExecRestrictedStmt/SQL.
Expand All @@ -78,11 +79,17 @@ var ExecOptionAnalyzeVer1 OptionFuncAlias = func(option *ExecOption) {
}

// ExecOptionAnalyzeVer2 tells ExecRestrictedStmt/SQL to collect statistics with version2.
// ExecOptionAnalyzeVer2 tells ExecRestrictedStmt to collect statistics with version2.
var ExecOptionAnalyzeVer2 OptionFuncAlias = func(option *ExecOption) {
option.AnalyzeVer = 2
}

// GetPartitionPruneModeOption returns a function which tells ExecRestrictedStmt/SQL to run with pruneMode.
func GetPartitionPruneModeOption(pruneMode string) OptionFuncAlias {
return func(option *ExecOption) {
option.PartitionPruneMode = pruneMode
}
}

// ExecOptionUseCurSession tells ExecRestrictedStmt/SQL to use current session.
var ExecOptionUseCurSession OptionFuncAlias = func(option *ExecOption) {
option.UseCurSession = true
Expand Down

0 comments on commit 9c836a5

Please sign in to comment.