Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support merge global topn in concurrency (#38358) #38524

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2605,3 +2605,78 @@ func TestAnalyzePartitionTableForFloat(t *testing.T) {
}
tk.MustExec("analyze table t1")
}

func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec("use test")
tk.MustExec("create table t(id int) partition by hash(id) partitions 4")
testcases := []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
// assert empty table
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'")
}

for i := 1; i <= 500; i++ {
for j := 1; j <= 20; j++ {
tk.MustExec(fmt.Sprintf("insert into t (id) values (%v)", j))
}
}
var expected [][]interface{}
for i := 1; i <= 20; i++ {
expected = append(expected, []interface{}{
strconv.FormatInt(int64(i), 10), "500",
})
}
testcases = []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected)
}
}
113 changes: 113 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,119 @@ type SessionVars struct {

// StatsLoadSyncWait indicates how long to wait for stats load before timeout.
StatsLoadSyncWait int64
<<<<<<< HEAD
=======

// SysdateIsNow indicates whether Sysdate is an alias of Now function
SysdateIsNow bool
// EnableMutationChecker indicates whether to check data consistency for mutations
EnableMutationChecker bool
// AssertionLevel controls how strict the assertions on data mutations should be.
AssertionLevel AssertionLevel
// IgnorePreparedCacheCloseStmt controls if ignore the close-stmt command for prepared statement.
IgnorePreparedCacheCloseStmt bool
// EnableNewCostInterface is a internal switch to indicates whether to use the new cost calculation interface.
EnableNewCostInterface bool
// CostModelVersion is a internal switch to indicates the Cost Model Version.
CostModelVersion int
// BatchPendingTiFlashCount shows the threshold of pending TiFlash tables when batch adding.
BatchPendingTiFlashCount int
// RcWriteCheckTS indicates whether some special write statements don't get latest tso from PD at RC
RcWriteCheckTS bool
// RemoveOrderbyInSubquery indicates whether to remove ORDER BY in subquery.
RemoveOrderbyInSubquery bool
// NonTransactionalIgnoreError indicates whether to ignore error in non-transactional statements.
// When set to false, returns immediately when it meets the first error.
NonTransactionalIgnoreError bool

// MaxAllowedPacket indicates the maximum size of a packet for the MySQL protocol.
MaxAllowedPacket uint64

// TiFlash related optimization, only for MPP.
TiFlashFineGrainedShuffleStreamCount int64
TiFlashFineGrainedShuffleBatchSize uint64

// RequestSourceType is the type of inner request.
RequestSourceType string

// MemoryDebugModeMinHeapInUse indicated the minimum heapInUse threshold that triggers the memoryDebugMode.
MemoryDebugModeMinHeapInUse int64
// MemoryDebugModeAlarmRatio indicated the allowable bias ratio of memory tracking accuracy check.
// When `(memory trakced by tidb) * (1+MemoryDebugModeAlarmRatio) < actual heapInUse`, an alarm log will be recorded.
MemoryDebugModeAlarmRatio int64

// EnableAnalyzeSnapshot indicates whether to read data on snapshot when collecting statistics.
// When it is false, ANALYZE reads the latest data.
// When it is true, ANALYZE reads data on the snapshot at the beginning of ANALYZE.
EnableAnalyzeSnapshot bool

// DefaultStrMatchSelectivity adjust the estimation strategy for string matching expressions that can't be estimated by building into range.
// when > 0: it's the selectivity for the expression.
// when = 0: try to use TopN to evaluate the like expression to estimate the selectivity.
DefaultStrMatchSelectivity float64

// TiFlashFastScan indicates whether use fast scan in TiFlash
TiFlashFastScan bool

// PrimaryKeyRequired indicates if sql_require_primary_key sysvar is set
PrimaryKeyRequired bool

// EnablePreparedPlanCache indicates whether to enable prepared plan cache.
EnablePreparedPlanCache bool

// GeneralPlanCacheSize controls the size of general plan cache.
PreparedPlanCacheSize uint64

// EnableGeneralPlanCache indicates whether to enable general plan cache.
EnableGeneralPlanCache bool

// GeneralPlanCacheSize controls the size of general plan cache.
GeneralPlanCacheSize uint64

// ConstraintCheckInPlacePessimistic controls whether to skip the locking of some keys in pessimistic transactions.
// Postpone the conflict check and constraint check to prewrite or later pessimistic locking requests.
ConstraintCheckInPlacePessimistic bool

// EnableTiFlashReadForWriteStmt indicates whether to enable TiFlash to read for write statements.
EnableTiFlashReadForWriteStmt bool

// EnableUnsafeSubstitute indicates whether to enable generate column takes unsafe substitute.
EnableUnsafeSubstitute bool

// ForeignKeyChecks indicates whether to enable foreign key constraint check.
ForeignKeyChecks bool

// RangeMaxSize is the max memory limit for ranges. When the optimizer estimates that the memory usage of complete
// ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no
// memory limit for ranges.
RangeMaxSize int64

// LastPlanReplayerToken indicates the last plan replayer token
LastPlanReplayerToken string

// AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats
AnalyzePartitionMergeConcurrency int

HookContext
}

// GetPreparedStmtByName returns the prepared statement specified by stmtName.
func (s *SessionVars) GetPreparedStmtByName(stmtName string) (interface{}, error) {
stmtID, ok := s.PreparedStmtNameToID[stmtName]
if !ok {
return nil, ErrStmtNotFound
}
return s.GetPreparedStmtByID(stmtID)
}

// GetPreparedStmtByID returns the prepared statement specified by stmtID.
func (s *SessionVars) GetPreparedStmtByID(stmtID uint32) (interface{}, error) {
stmt, ok := s.PreparedStmts[stmtID]
if !ok {
return nil, ErrStmtNotFound
}
return stmt, nil
>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358))
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
91 changes: 91 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1403,6 +1403,97 @@ var defaultSysVars = []*SysVar{
return nil
},
},
<<<<<<< HEAD
=======
{Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleStreamCount, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleStreamCount), Type: TypeInt, MinValue: -1, MaxValue: 1024,
SetSession: func(s *SessionVars, val string) error {
s.TiFlashFineGrainedShuffleStreamCount = TidbOptInt64(val, DefTiFlashFineGrainedShuffleStreamCount)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleBatchSize, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleBatchSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64,
SetSession: func(s *SessionVars, val string) error {
s.TiFlashFineGrainedShuffleBatchSize = uint64(TidbOptInt64(val, DefTiFlashFineGrainedShuffleBatchSize))
return nil
}},
{Scope: ScopeGlobal, Name: TiDBSimplifiedMetrics, Value: BoolToOnOff(DefTiDBSimplifiedMetrics), Type: TypeBool,
SetGlobal: func(_ context.Context, vars *SessionVars, s string) error {
metrics.ToggleSimplifiedMode(TiDBOptOn(s))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMinPagingSize, Value: strconv.Itoa(DefMinPagingSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MinPagingSize = tidbOptPositiveInt32(val, DefMinPagingSize)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBMaxPagingSize, Value: strconv.Itoa(DefMaxPagingSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MaxPagingSize = tidbOptPositiveInt32(val, DefMaxPagingSize)
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0)
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeAlarmRatio, Value: strconv.Itoa(0), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeAlarmRatio = TidbOptInt64(val, 0)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: SQLRequirePrimaryKey, Value: Off, Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.PrimaryKeyRequired = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableAnalyzeSnapshot, Value: BoolToOnOff(DefTiDBEnableAnalyzeSnapshot), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableAnalyzeSnapshot = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBGenerateBinaryPlan, Value: BoolToOnOff(DefTiDBGenerateBinaryPlan), Type: TypeBool, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
GenerateBinaryPlan.Store(TiDBOptOn(val))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBDefaultStrMatchSelectivity, Value: strconv.FormatFloat(DefTiDBDefaultStrMatchSelectivity, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: 1,
SetSession: func(s *SessionVars, val string) error {
s.DefaultStrMatchSelectivity = tidbOptFloat64(val, DefTiDBDefaultStrMatchSelectivity)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBDDLEnableFastReorg, Value: BoolToOnOff(DefTiDBEnableFastReorg), Type: TypeBool, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) {
return BoolToOnOff(EnableFastReorg.Load()), nil
}, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
EnableFastReorg.Store(TiDBOptOn(val))
return nil
}},
// This system var is set disk quota for lightning sort dir, from 100 GB to 1PB.
{Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) {
return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil
}, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBConstraintCheckInPlacePessimistic, Value: BoolToOnOff(DefTiDBConstraintCheckInPlacePessimistic), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
s.ConstraintCheckInPlacePessimistic = TiDBOptOn(val)
if !s.ConstraintCheckInPlacePessimistic {
metrics.LazyPessimisticUniqueCheckSetCount.Inc()
}
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashReadForWriteStmt, Value: BoolToOnOff(DefTiDBEnableTiFlashReadForWriteStmt), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableTiFlashReadForWriteStmt = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableUnsafeSubstitute, Value: BoolToOnOff(false), Type: TypeBool, SetSession: func(s *SessionVars, val string) error {
s.EnableUnsafeSubstitute = TiDBOptOn(val)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBOptRangeMaxSize, Value: strconv.FormatInt(DefTiDBOptRangeMaxSize, 10), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency,
SetSession: func(s *SessionVars, val string) error {
s.AnalyzePartitionMergeConcurrency = TidbOptInt(val, DefTiDBMergePartitionStatsConcurrency)
return nil
},
},
>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358))
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
Loading