diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 6e0374c72ca8c..561825e55cdf8 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -2605,3 +2605,78 @@ func TestAnalyzePartitionTableForFloat(t *testing.T) { } tk.MustExec("analyze table t1") } + +func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec("use test") + tk.MustExec("create table t(id int) partition by hash(id) partitions 4") + testcases := []struct { + concurrency string + }{ + { + concurrency: "1", + }, + { + concurrency: "2", + }, + { + concurrency: "3", + }, + { + concurrency: "4", + }, + { + concurrency: "5", + }, + } + // assert empty table + for _, tc := range testcases { + concurrency := tc.concurrency + fmt.Println("testcase ", concurrency) + tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency)) + tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec("analyze table t") + tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'") + } + + for i := 1; i <= 500; i++ { + for j := 1; j <= 20; j++ { + tk.MustExec(fmt.Sprintf("insert into t (id) values (%v)", j)) + } + } + var expected [][]interface{} + for i := 1; i <= 20; i++ { + expected = append(expected, []interface{}{ + strconv.FormatInt(int64(i), 10), "500", + }) + } + testcases = []struct { + concurrency string + }{ + { + concurrency: "1", + }, + { + concurrency: "2", + }, + { + concurrency: "3", + }, + { + concurrency: "4", + }, + { + concurrency: "5", + }, + } + for _, tc := range testcases { + concurrency := tc.concurrency + fmt.Println("testcase ", concurrency) + tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency)) + tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency)) + tk.MustExec("analyze table t") + tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected) + } +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index ae6d14df23f6b..bc19247ec8712 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1002,6 +1002,119 @@ type SessionVars struct { // StatsLoadSyncWait indicates how long to wait for stats load before timeout. StatsLoadSyncWait int64 +<<<<<<< HEAD +======= + + // SysdateIsNow indicates whether Sysdate is an alias of Now function + SysdateIsNow bool + // EnableMutationChecker indicates whether to check data consistency for mutations + EnableMutationChecker bool + // AssertionLevel controls how strict the assertions on data mutations should be. + AssertionLevel AssertionLevel + // IgnorePreparedCacheCloseStmt controls if ignore the close-stmt command for prepared statement. + IgnorePreparedCacheCloseStmt bool + // EnableNewCostInterface is a internal switch to indicates whether to use the new cost calculation interface. + EnableNewCostInterface bool + // CostModelVersion is a internal switch to indicates the Cost Model Version. + CostModelVersion int + // BatchPendingTiFlashCount shows the threshold of pending TiFlash tables when batch adding. + BatchPendingTiFlashCount int + // RcWriteCheckTS indicates whether some special write statements don't get latest tso from PD at RC + RcWriteCheckTS bool + // RemoveOrderbyInSubquery indicates whether to remove ORDER BY in subquery. + RemoveOrderbyInSubquery bool + // NonTransactionalIgnoreError indicates whether to ignore error in non-transactional statements. + // When set to false, returns immediately when it meets the first error. + NonTransactionalIgnoreError bool + + // MaxAllowedPacket indicates the maximum size of a packet for the MySQL protocol. + MaxAllowedPacket uint64 + + // TiFlash related optimization, only for MPP. + TiFlashFineGrainedShuffleStreamCount int64 + TiFlashFineGrainedShuffleBatchSize uint64 + + // RequestSourceType is the type of inner request. + RequestSourceType string + + // MemoryDebugModeMinHeapInUse indicated the minimum heapInUse threshold that triggers the memoryDebugMode. + MemoryDebugModeMinHeapInUse int64 + // MemoryDebugModeAlarmRatio indicated the allowable bias ratio of memory tracking accuracy check. + // When `(memory trakced by tidb) * (1+MemoryDebugModeAlarmRatio) < actual heapInUse`, an alarm log will be recorded. + MemoryDebugModeAlarmRatio int64 + + // EnableAnalyzeSnapshot indicates whether to read data on snapshot when collecting statistics. + // When it is false, ANALYZE reads the latest data. + // When it is true, ANALYZE reads data on the snapshot at the beginning of ANALYZE. + EnableAnalyzeSnapshot bool + + // DefaultStrMatchSelectivity adjust the estimation strategy for string matching expressions that can't be estimated by building into range. + // when > 0: it's the selectivity for the expression. + // when = 0: try to use TopN to evaluate the like expression to estimate the selectivity. + DefaultStrMatchSelectivity float64 + + // TiFlashFastScan indicates whether use fast scan in TiFlash + TiFlashFastScan bool + + // PrimaryKeyRequired indicates if sql_require_primary_key sysvar is set + PrimaryKeyRequired bool + + // EnablePreparedPlanCache indicates whether to enable prepared plan cache. + EnablePreparedPlanCache bool + + // GeneralPlanCacheSize controls the size of general plan cache. + PreparedPlanCacheSize uint64 + + // EnableGeneralPlanCache indicates whether to enable general plan cache. + EnableGeneralPlanCache bool + + // GeneralPlanCacheSize controls the size of general plan cache. + GeneralPlanCacheSize uint64 + + // ConstraintCheckInPlacePessimistic controls whether to skip the locking of some keys in pessimistic transactions. + // Postpone the conflict check and constraint check to prewrite or later pessimistic locking requests. + ConstraintCheckInPlacePessimistic bool + + // EnableTiFlashReadForWriteStmt indicates whether to enable TiFlash to read for write statements. + EnableTiFlashReadForWriteStmt bool + + // EnableUnsafeSubstitute indicates whether to enable generate column takes unsafe substitute. + EnableUnsafeSubstitute bool + + // ForeignKeyChecks indicates whether to enable foreign key constraint check. + ForeignKeyChecks bool + + // RangeMaxSize is the max memory limit for ranges. When the optimizer estimates that the memory usage of complete + // ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no + // memory limit for ranges. + RangeMaxSize int64 + + // LastPlanReplayerToken indicates the last plan replayer token + LastPlanReplayerToken string + + // AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats + AnalyzePartitionMergeConcurrency int + + HookContext +} + +// GetPreparedStmtByName returns the prepared statement specified by stmtName. +func (s *SessionVars) GetPreparedStmtByName(stmtName string) (interface{}, error) { + stmtID, ok := s.PreparedStmtNameToID[stmtName] + if !ok { + return nil, ErrStmtNotFound + } + return s.GetPreparedStmtByID(stmtID) +} + +// GetPreparedStmtByID returns the prepared statement specified by stmtID. +func (s *SessionVars) GetPreparedStmtByID(stmtID uint32) (interface{}, error) { + stmt, ok := s.PreparedStmts[stmtID] + if !ok { + return nil, ErrStmtNotFound + } + return stmt, nil +>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358)) } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 11bb4b6adac0a..2b1a1cbe71f17 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1403,6 +1403,97 @@ var defaultSysVars = []*SysVar{ return nil }, }, +<<<<<<< HEAD +======= + {Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleStreamCount, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleStreamCount), Type: TypeInt, MinValue: -1, MaxValue: 1024, + SetSession: func(s *SessionVars, val string) error { + s.TiFlashFineGrainedShuffleStreamCount = TidbOptInt64(val, DefTiFlashFineGrainedShuffleStreamCount) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiFlashFineGrainedShuffleBatchSize, Value: strconv.Itoa(DefTiFlashFineGrainedShuffleBatchSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxUint64, + SetSession: func(s *SessionVars, val string) error { + s.TiFlashFineGrainedShuffleBatchSize = uint64(TidbOptInt64(val, DefTiFlashFineGrainedShuffleBatchSize)) + return nil + }}, + {Scope: ScopeGlobal, Name: TiDBSimplifiedMetrics, Value: BoolToOnOff(DefTiDBSimplifiedMetrics), Type: TypeBool, + SetGlobal: func(_ context.Context, vars *SessionVars, s string) error { + metrics.ToggleSimplifiedMode(TiDBOptOn(s)) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBMinPagingSize, Value: strconv.Itoa(DefMinPagingSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { + s.MinPagingSize = tidbOptPositiveInt32(val, DefMinPagingSize) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBMaxPagingSize, Value: strconv.Itoa(DefMaxPagingSize), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { + s.MaxPagingSize = tidbOptPositiveInt32(val, DefMaxPagingSize) + return nil + }}, + {Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { + s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0) + return nil + }}, + {Scope: ScopeSession, Name: TiDBMemoryDebugModeAlarmRatio, Value: strconv.Itoa(0), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { + s.MemoryDebugModeAlarmRatio = TidbOptInt64(val, 0) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: SQLRequirePrimaryKey, Value: Off, Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.PrimaryKeyRequired = TiDBOptOn(val) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableAnalyzeSnapshot, Value: BoolToOnOff(DefTiDBEnableAnalyzeSnapshot), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.EnableAnalyzeSnapshot = TiDBOptOn(val) + return nil + }}, + {Scope: ScopeGlobal, Name: TiDBGenerateBinaryPlan, Value: BoolToOnOff(DefTiDBGenerateBinaryPlan), Type: TypeBool, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + GenerateBinaryPlan.Store(TiDBOptOn(val)) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBDefaultStrMatchSelectivity, Value: strconv.FormatFloat(DefTiDBDefaultStrMatchSelectivity, 'f', -1, 64), Type: TypeFloat, MinValue: 0, MaxValue: 1, + SetSession: func(s *SessionVars, val string) error { + s.DefaultStrMatchSelectivity = tidbOptFloat64(val, DefTiDBDefaultStrMatchSelectivity) + return nil + }}, + {Scope: ScopeGlobal, Name: TiDBDDLEnableFastReorg, Value: BoolToOnOff(DefTiDBEnableFastReorg), Type: TypeBool, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { + return BoolToOnOff(EnableFastReorg.Load()), nil + }, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + EnableFastReorg.Store(TiDBOptOn(val)) + return nil + }}, + // This system var is set disk quota for lightning sort dir, from 100 GB to 1PB. + {Scope: ScopeGlobal, Name: TiDBDDLDiskQuota, Value: strconv.Itoa(DefTiDBDDLDiskQuota), Type: TypeInt, MinValue: DefTiDBDDLDiskQuota, MaxValue: 1024 * 1024 * DefTiDBDDLDiskQuota / 100, GetGlobal: func(_ context.Context, sv *SessionVars) (string, error) { + return strconv.FormatUint(DDLDiskQuota.Load(), 10), nil + }, SetGlobal: func(_ context.Context, s *SessionVars, val string) error { + DDLDiskQuota.Store(TidbOptUint64(val, DefTiDBDDLDiskQuota)) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBConstraintCheckInPlacePessimistic, Value: BoolToOnOff(DefTiDBConstraintCheckInPlacePessimistic), Type: TypeBool, + SetSession: func(s *SessionVars, val string) error { + s.ConstraintCheckInPlacePessimistic = TiDBOptOn(val) + if !s.ConstraintCheckInPlacePessimistic { + metrics.LazyPessimisticUniqueCheckSetCount.Inc() + } + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashReadForWriteStmt, Value: BoolToOnOff(DefTiDBEnableTiFlashReadForWriteStmt), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.EnableTiFlashReadForWriteStmt = TiDBOptOn(val) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableUnsafeSubstitute, Value: BoolToOnOff(false), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.EnableUnsafeSubstitute = TiDBOptOn(val) + return nil + }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBOptRangeMaxSize, Value: strconv.FormatInt(DefTiDBOptRangeMaxSize, 10), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error { + s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize) + return nil + }}, + { + Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency, + SetSession: func(s *SessionVars, val string) error { + s.AnalyzePartitionMergeConcurrency = TidbOptInt(val, DefTiDBMergePartitionStatsConcurrency) + return nil + }, + }, +>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358)) } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 5f5b42cd66fd7..e4b206e9ffe28 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -612,6 +612,92 @@ const ( // TiDBStatsLoadSyncWait indicates the time sql execution will sync-wait for stats load. TiDBStatsLoadSyncWait = "tidb_stats_load_sync_wait" +<<<<<<< HEAD +======= + + // TiDBEnableMutationChecker indicates whether to check data consistency for mutations + TiDBEnableMutationChecker = "tidb_enable_mutation_checker" + // TiDBTxnAssertionLevel indicates how strict the assertion will be, which helps to detect and preventing data & + // index inconsistency problems. + TiDBTxnAssertionLevel = "tidb_txn_assertion_level" + + // TiDBIgnorePreparedCacheCloseStmt indicates whether to ignore close-stmt commands for prepared statements. + TiDBIgnorePreparedCacheCloseStmt = "tidb_ignore_prepared_cache_close_stmt" + + // TiDBEnableNewCostInterface is a internal switch to indicates whether to use the new cost calculation interface. + TiDBEnableNewCostInterface = "tidb_enable_new_cost_interface" + + // TiDBCostModelVersion is a internal switch to indicates the cost model version. + TiDBCostModelVersion = "tidb_cost_model_version" + + // TiDBBatchPendingTiFlashCount indicates the maximum count of non-available TiFlash tables. + TiDBBatchPendingTiFlashCount = "tidb_batch_pending_tiflash_count" + + // TiDBQueryLogMaxLen is used to set the max length of the query in the log. + TiDBQueryLogMaxLen = "tidb_query_log_max_len" + + // TiDBEnableNoopVariables is used to indicate if noops appear in SHOW [GLOBAL] VARIABLES + TiDBEnableNoopVariables = "tidb_enable_noop_variables" + + // TiDBNonTransactionalIgnoreError is used to ignore error in non-transactional DMLs. + // When set to false, a non-transactional DML returns when it meets the first error. + // When set to true, a non-transactional DML finishes all batches even if errors are met in some batches. + TiDBNonTransactionalIgnoreError = "tidb_nontransactional_ignore_error" + + // Fine grained shuffle is disabled when TiFlashFineGrainedShuffleStreamCount is zero. + TiFlashFineGrainedShuffleStreamCount = "tiflash_fine_grained_shuffle_stream_count" + TiFlashFineGrainedShuffleBatchSize = "tiflash_fine_grained_shuffle_batch_size" + + // TiDBSimplifiedMetrics controls whether to unregister some unused metrics. + TiDBSimplifiedMetrics = "tidb_simplified_metrics" + + // TiDBMemoryDebugModeMinHeapInUse is used to set tidb memory debug mode trigger threshold. + // When set to 0, the function is disabled. + // When set to a negative integer, use memory debug mode to detect the issue of frequent allocation and release of memory. + // We do not actively trigger gc, and check whether the `tracker memory * (1+bias ratio) > heap in use` each 5s. + // When set to a positive integer, use memory debug mode to detect the issue of memory tracking inaccurate. + // We trigger runtime.GC() each 5s, and check whether the `tracker memory * (1+bias ratio) > heap in use`. + TiDBMemoryDebugModeMinHeapInUse = "tidb_memory_debug_mode_min_heap_inuse" + // TiDBMemoryDebugModeAlarmRatio is used set tidb memory debug mode bias ratio. Treat memory bias less than this ratio as noise. + TiDBMemoryDebugModeAlarmRatio = "tidb_memory_debug_mode_alarm_ratio" + + // TiDBEnableAnalyzeSnapshot indicates whether to read data on snapshot when collecting statistics. + // When set to false, ANALYZE reads the latest data. + // When set to true, ANALYZE reads data on the snapshot at the beginning of ANALYZE. + TiDBEnableAnalyzeSnapshot = "tidb_enable_analyze_snapshot" + + // TiDBDefaultStrMatchSelectivity controls some special cardinality estimation strategy for string match functions (like and regexp). + // When set to 0, Selectivity() will try to evaluate those functions with TopN and NULL in the stats to estimate, + // and the default selectivity and the selectivity for the histogram part will be 0.1. + // When set to (0, 1], Selectivity() will use the value of this variable as the default selectivity of those + // functions instead of the selectionFactor (0.8). + TiDBDefaultStrMatchSelectivity = "tidb_default_string_match_selectivity" + + // TiDBEnablePrepPlanCache indicates whether to enable prepared plan cache + TiDBEnablePrepPlanCache = "tidb_enable_prepared_plan_cache" + // TiDBPrepPlanCacheSize indicates the number of cached statements. + TiDBPrepPlanCacheSize = "tidb_prepared_plan_cache_size" + + // TiDBEnableGeneralPlanCache indicates whether to enable general plan cache. + TiDBEnableGeneralPlanCache = "tidb_enable_general_plan_cache" + // TiDBGeneralPlanCacheSize controls the size of general plan cache. + TiDBGeneralPlanCacheSize = "tidb_general_plan_cache_size" + + // TiDBConstraintCheckInPlacePessimistic controls whether to skip certain kinds of pessimistic locks. + TiDBConstraintCheckInPlacePessimistic = "tidb_constraint_check_in_place_pessimistic" + + // TiDBEnableForeignKey indicates whether to enable foreign key feature. + // TODO(crazycs520): remove this after foreign key GA. + TiDBEnableForeignKey = "tidb_enable_foreign_key" + + // TiDBOptRangeMaxSize is the max memory limit for ranges. When the optimizer estimates that the memory usage of complete + // ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no memory + // limit for ranges. + TiDBOptRangeMaxSize = "tidb_opt_range_max_size" + + // TiDBMergePartitionStatsConcurrency indicates the concurrecny when merge partition stats into global stats + TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency" +>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358)) ) // TiDB vars that have only global scope @@ -654,6 +740,7 @@ const ( // Default TiDB system variable values. const ( +<<<<<<< HEAD DefHostname = "localhost" DefIndexLookupConcurrency = ConcurrencyUnset DefIndexLookupJoinConcurrency = ConcurrencyUnset @@ -799,6 +886,226 @@ const ( DefTiDBEnableColumnTracking = false DefTiDBStatsLoadSyncWait = 0 DefTiDBStatsLoadPseudoTimeout = false +======= + DefHostname = "localhost" + DefIndexLookupConcurrency = ConcurrencyUnset + DefIndexLookupJoinConcurrency = ConcurrencyUnset + DefIndexSerialScanConcurrency = 1 + DefIndexJoinBatchSize = 25000 + DefIndexLookupSize = 20000 + DefDistSQLScanConcurrency = 15 + DefBuildStatsConcurrency = 4 + DefAutoAnalyzeRatio = 0.5 + DefAutoAnalyzeStartTime = "00:00 +0000" + DefAutoAnalyzeEndTime = "23:59 +0000" + DefAutoIncrementIncrement = 1 + DefAutoIncrementOffset = 1 + DefChecksumTableConcurrency = 4 + DefSkipUTF8Check = false + DefSkipASCIICheck = false + DefOptAggPushDown = false + DefOptCartesianBCJ = 1 + DefOptMPPOuterJoinFixedBuildSide = false + DefOptWriteRowID = false + DefOptEnableCorrelationAdjustment = true + DefOptLimitPushDownThreshold = 100 + DefOptCorrelationThreshold = 0.9 + DefOptCorrelationExpFactor = 1 + DefOptCPUFactor = 3.0 + DefOptCopCPUFactor = 3.0 + DefOptTiFlashConcurrencyFactor = 24.0 + DefOptNetworkFactor = 1.0 + DefOptScanFactor = 1.5 + DefOptDescScanFactor = 3.0 + DefOptSeekFactor = 20.0 + DefOptMemoryFactor = 0.001 + DefOptDiskFactor = 1.5 + DefOptConcurrencyFactor = 3.0 + DefOptForceInlineCTE = false + DefOptInSubqToJoinAndAgg = true + DefOptPreferRangeScan = false + DefBatchInsert = false + DefBatchDelete = false + DefBatchCommit = false + DefCurretTS = 0 + DefInitChunkSize = 32 + DefMinPagingSize = int(paging.MinPagingSize) + DefMaxPagingSize = int(paging.MaxPagingSize) + DefMaxChunkSize = 1024 + DefDMLBatchSize = 0 + DefMaxPreparedStmtCount = -1 + DefWaitTimeout = 28800 + DefTiDBMemQuotaApplyCache = 32 << 20 // 32MB. + DefTiDBMemQuotaBindingCache = 64 << 20 // 64MB. + DefTiDBGeneralLog = false + DefTiDBPProfSQLCPU = 0 + DefTiDBRetryLimit = 10 + DefTiDBDisableTxnAutoRetry = true + DefTiDBConstraintCheckInPlace = false + DefTiDBHashJoinConcurrency = ConcurrencyUnset + DefTiDBProjectionConcurrency = ConcurrencyUnset + DefBroadcastJoinThresholdSize = 100 * 1024 * 1024 + DefBroadcastJoinThresholdCount = 10 * 1024 + DefTiDBOptimizerSelectivityLevel = 0 + DefTiDBOptimizerEnableNewOFGB = false + DefTiDBEnableOuterJoinReorder = false + DefTiDBEnableNAAJ = false + DefTiDBAllowBatchCop = 1 + DefTiDBAllowMPPExecution = true + DefTiDBHashExchangeWithNewCollation = true + DefTiDBEnforceMPPExecution = false + DefTiFlashMaxThreads = -1 + DefTiDBMPPStoreFailTTL = "60s" + DefTiDBTxnMode = "" + DefTiDBRowFormatV1 = 1 + DefTiDBRowFormatV2 = 2 + DefTiDBDDLReorgWorkerCount = 4 + DefTiDBDDLReorgBatchSize = 256 + DefTiDBDDLFlashbackConcurrency = 64 + DefTiDBDDLErrorCountLimit = 512 + DefTiDBMaxDeltaSchemaCount = 1024 + DefTiDBPlacementMode = PlacementModeStrict + DefTiDBEnableAutoIncrementInGenerated = false + DefTiDBHashAggPartialConcurrency = ConcurrencyUnset + DefTiDBHashAggFinalConcurrency = ConcurrencyUnset + DefTiDBWindowConcurrency = ConcurrencyUnset + DefTiDBMergeJoinConcurrency = 1 // disable optimization by default + DefTiDBStreamAggConcurrency = 1 + DefTiDBForcePriority = mysql.NoPriority + DefEnableWindowFunction = true + DefEnablePipelinedWindowFunction = true + DefEnableStrictDoubleTypeCheck = true + DefEnableVectorizedExpression = true + DefTiDBOptJoinReorderThreshold = 0 + DefTiDBDDLSlowOprThreshold = 300 + DefTiDBUseFastAnalyze = false + DefTiDBSkipIsolationLevelCheck = false + DefTiDBExpensiveQueryTimeThreshold = 60 // 60s + DefTiDBScatterRegion = false + DefTiDBWaitSplitRegionFinish = true + DefWaitSplitRegionTimeout = 300 // 300s + DefTiDBEnableNoopFuncs = Off + DefTiDBEnableNoopVariables = true + DefTiDBAllowRemoveAutoInc = false + DefTiDBUsePlanBaselines = true + DefTiDBEvolvePlanBaselines = false + DefTiDBEvolvePlanTaskMaxTime = 600 // 600s + DefTiDBEvolvePlanTaskStartTime = "00:00 +0000" + DefTiDBEvolvePlanTaskEndTime = "23:59 +0000" + DefInnodbLockWaitTimeout = 50 // 50s + DefTiDBStoreLimit = 0 + DefTiDBMetricSchemaStep = 60 // 60s + DefTiDBMetricSchemaRangeDuration = 60 // 60s + DefTiDBFoundInPlanCache = false + DefTiDBFoundInBinding = false + DefTiDBEnableCollectExecutionInfo = true + DefTiDBAllowAutoRandExplicitInsert = false + DefTiDBEnableClusteredIndex = ClusteredIndexDefModeOn + DefTiDBRedactLog = false + DefTiDBRestrictedReadOnly = false + DefTiDBSuperReadOnly = false + DefTiDBShardAllocateStep = math.MaxInt64 + DefTiDBEnableTelemetry = true + DefTiDBEnableParallelApply = false + DefTiDBEnableAmendPessimisticTxn = false + DefTiDBPartitionPruneMode = "dynamic" + DefTiDBEnableRateLimitAction = false + DefTiDBEnableAsyncCommit = false + DefTiDBEnable1PC = false + DefTiDBGuaranteeLinearizability = true + DefTiDBAnalyzeVersion = 2 + DefTiDBAutoAnalyzePartitionBatchSize = 1 + DefTiDBEnableIndexMergeJoin = false + DefTiDBTrackAggregateMemoryUsage = true + DefTiDBEnableExchangePartition = true + DefCTEMaxRecursionDepth = 1000 + DefTiDBTmpTableMaxSize = 64 << 20 // 64MB. + DefTiDBEnableLocalTxn = false + DefTiDBTSOClientBatchMaxWaitTime = 0.0 // 0ms + DefTiDBEnableTSOFollowerProxy = false + DefTiDBEnableOrderedResultMode = false + DefTiDBEnablePseudoForOutdatedStats = false + DefTiDBRegardNULLAsPoint = true + DefEnablePlacementCheck = true + DefTimestamp = "0" + DefTimestampFloat = 0.0 + DefTiDBEnableStmtSummary = true + DefTiDBStmtSummaryInternalQuery = false + DefTiDBStmtSummaryRefreshInterval = 1800 + DefTiDBStmtSummaryHistorySize = 24 + DefTiDBStmtSummaryMaxStmtCount = 3000 + DefTiDBStmtSummaryMaxSQLLength = 4096 + DefTiDBCapturePlanBaseline = Off + DefTiDBEnableIndexMerge = true + DefEnableLegacyInstanceScope = true + DefTiDBTableCacheLease = 3 // 3s + DefTiDBPersistAnalyzeOptions = true + DefTiDBEnableColumnTracking = false + DefTiDBStatsLoadSyncWait = 0 + DefTiDBStatsLoadPseudoTimeout = true + DefSysdateIsNow = false + DefTiDBEnableMutationChecker = false + DefTiDBTxnAssertionLevel = AssertionOffStr + DefTiDBIgnorePreparedCacheCloseStmt = false + DefTiDBBatchPendingTiFlashCount = 4000 + DefRCReadCheckTS = false + DefTiDBRemoveOrderbyInSubquery = false + DefTiDBSkewDistinctAgg = false + DefTiDB3StageDistinctAgg = true + DefTiDBReadStaleness = 0 + DefTiDBGCMaxWaitTime = 24 * 60 * 60 + DefMaxAllowedPacket uint64 = 67108864 + DefTiDBEnableBatchDML = false + DefTiDBMemQuotaQuery = 1073741824 // 1GB + DefTiDBStatsCacheMemQuota = 0 + MaxTiDBStatsCacheMemQuota = 1024 * 1024 * 1024 * 1024 // 1TB + DefTiDBQueryLogMaxLen = 4096 + DefRequireSecureTransport = false + DefTiDBCommitterConcurrency = 128 + DefTiDBBatchDMLIgnoreError = false + DefTiDBMemQuotaAnalyze = -1 + DefTiDBEnableAutoAnalyze = true + DefTiDBMemOOMAction = "CANCEL" + DefTiDBMaxAutoAnalyzeTime = 12 * 60 * 60 + DefTiDBEnablePrepPlanCache = true + DefTiDBPrepPlanCacheSize = 100 + DefTiDBPrepPlanCacheMemoryGuardRatio = 0.1 + DefTiDBEnableConcurrentDDL = concurrencyddl.TiDBEnableConcurrentDDL + DefTiDBSimplifiedMetrics = false + DefTiDBEnablePaging = true + DefTiFlashFineGrainedShuffleStreamCount = 0 + DefStreamCountWhenMaxThreadsNotSet = 8 + DefTiFlashFineGrainedShuffleBatchSize = 8192 + DefAdaptiveClosestReadThreshold = 4096 + DefTiDBEnableAnalyzeSnapshot = false + DefTiDBGenerateBinaryPlan = true + DefEnableTiDBGCAwareMemoryTrack = true + DefTiDBDefaultStrMatchSelectivity = 0.8 + DefTiDBEnableTmpStorageOnOOM = true + DefTiDBEnableMDL = false + DefTiFlashFastScan = false + DefMemoryUsageAlarmRatio = 0.7 + DefMemoryUsageAlarmKeepRecordNum = 5 + DefTiDBEnableFastReorg = false + DefTiDBDDLDiskQuota = 100 * 1024 * 1024 * 1024 // 100GB + DefExecutorConcurrency = 5 + DefTiDBEnableGeneralPlanCache = false + DefTiDBGeneralPlanCacheSize = 100 + DefTiDBEnableTiFlashReadForWriteStmt = false + // MaxDDLReorgBatchSize is exported for testing. + MaxDDLReorgBatchSize int32 = 10240 + MinDDLReorgBatchSize int32 = 32 + MinExpensiveQueryTimeThreshold uint64 = 10 // 10s + DefTiDBRcWriteCheckTs = false + DefTiDBConstraintCheckInPlacePessimistic = true + DefTiDBForeignKeyChecks = false + DefTiDBOptRangeMaxSize = 0 + DefTiDBCostModelVer = 1 + DefTiDBServerMemoryLimitSessMinSize = 128 << 20 + DefTiDBMergePartitionStatsConcurrency = 1 + DefTiDBServerMemoryLimitGCTrigger = 0.7 + DefTiDBEnableGOGCTuner = true +>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358)) ) // Process global variables. diff --git a/statistics/cmsketch.go b/statistics/cmsketch.go index 5e5fad01addab..45eb78d331743 100644 --- a/statistics/cmsketch.go +++ b/statistics/cmsketch.go @@ -21,6 +21,7 @@ import ( "reflect" "sort" "strings" + "time" "github.com/pingcap/tidb/sessionctx" @@ -701,10 +702,17 @@ func NewTopN(n int) *TopN { // 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value. // 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate. // The output parameters: +<<<<<<< HEAD // 1. `*TopN` is the final global-level topN. // 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter. // 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN. func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) { +======= +// 1. `*TopN` is the final global-level topN. +// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter. +// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN. +func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) { +>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358)) if checkEmptyTopNs(topNs) { return nil, nil, hists, nil } @@ -756,9 +764,15 @@ func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs [ var err error if types.IsTypeTime(hists[0].Tp.Tp) { // handle datetime values specially since they are encoded to int and we'll get int values if using DecodeOne. +<<<<<<< HEAD _, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.Tp, sc.TimeZone) } else if types.IsTypeFloat(hists[0].Tp.Tp) { _, d, err = codec.DecodeAsFloat32(val.Encoded, hists[0].Tp.Tp) +======= + _, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), loc) + } else if types.IsTypeFloat(hists[0].Tp.GetType()) { + _, d, err = codec.DecodeAsFloat32(val.Encoded, hists[0].Tp.GetType()) +>>>>>>> e8d265981a (statistics: support merge global topn in concurrency (#38358)) } else { _, d, err = codec.DecodeOne(val.Encoded) } @@ -841,6 +855,22 @@ func checkEmptyTopNs(topNs []*TopN) bool { return count == 0 } +// SortTopnMeta sort topnMeta +func SortTopnMeta(topnMetas []TopNMeta) []TopNMeta { + slices.SortFunc(topnMetas, func(i, j TopNMeta) bool { + if i.Count != j.Count { + return i.Count > j.Count + } + return bytes.Compare(i.Encoded, j.Encoded) < 0 + }) + return topnMetas +} + +// GetMergedTopNFromSortedSlice returns merged topn +func GetMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) { + return getMergedTopNFromSortedSlice(sorted, n) +} + func getMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) { sort.Slice(sorted, func(i, j int) bool { if sorted[i].Count != sorted[j].Count { diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 2b07e9aebcb99..fb32bb99334ff 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -15,6 +15,7 @@ package handle import ( + "bytes" "context" "encoding/json" "fmt" @@ -29,7 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl/util" + ddlUtil "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" @@ -40,6 +41,7 @@ import ( "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" @@ -52,6 +54,9 @@ import ( const ( // TiDBGlobalStats represents the global-stats for a partitioned table. TiDBGlobalStats = "global" + + // maxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats + maxPartitionMergeBatchSize = 256 ) // statsCache caches the tables in memory for Handle. @@ -96,7 +101,7 @@ type Handle struct { // ddlEventCh is a channel to notify a ddl operation has happened. // It is sent only by owner or the drop stats executor, and read by stats handle. - ddlEventCh chan *util.Event + ddlEventCh chan *ddlUtil.Event // listHead contains all the stats collector required by session. listHead *SessionStatsCollector // globalMap contains all the delta map from collectors when we dump them to KV. @@ -210,7 +215,7 @@ type sessionPool interface { func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool) (*Handle, error) { cfg := config.GetGlobalConfig() handle := &Handle{ - ddlEventCh: make(chan *util.Event, 100), + ddlEventCh: make(chan *ddlUtil.Event, 100), listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)}, idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)}, pool: pool, @@ -466,7 +471,8 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context, opts map // Because after merging TopN, some numbers will be left. // These remaining topN numbers will be used as a separate bucket for later histogram merging. var popedTopN []statistics.TopNMeta - globalStats.TopN[i], popedTopN, allHg[i], err = statistics.MergePartTopN2GlobalTopN(sc.GetSessionVars().StmtCtx, sc.GetSessionVars().AnalyzeVersion, allTopN[i], uint32(opts[ast.AnalyzeOptNumTopN]), allHg[i], isIndex == 1) + wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i]) + globalStats.TopN[i], popedTopN, allHg[i], err = h.mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1) if err != nil { return } @@ -498,6 +504,104 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context, opts map return } +func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper, + timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency + // use original method if concurrency equals 1 or for version1 + if mergeConcurrency < 2 { + return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex) + } + batchSize := len(wrapper.AllTopN) / mergeConcurrency + if batchSize < 1 { + batchSize = 1 + } else if batchSize > maxPartitionMergeBatchSize { + batchSize = maxPartitionMergeBatchSize + } + return h.mergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex) +} + +// mergeGlobalStatsTopNByConcurrency merge partition topN by concurrency +// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker. +// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control +// the partition size for each worker to solve it +func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper, + timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN, + []statistics.TopNMeta, []*statistics.Histogram, error) { + if len(wrapper.AllTopN) < mergeConcurrency { + mergeConcurrency = len(wrapper.AllTopN) + } + tasks := make([]*statistics.TopnStatsMergeTask, 0) + for start := 0; start < len(wrapper.AllTopN); { + end := start + mergeBatchSize + if end > len(wrapper.AllTopN) { + end = len(wrapper.AllTopN) + } + task := statistics.NewTopnStatsMergeTask(start, end) + tasks = append(tasks, task) + start = end + } + var wg util.WaitGroupWrapper + taskNum := len(tasks) + taskCh := make(chan *statistics.TopnStatsMergeTask, taskNum) + respCh := make(chan *statistics.TopnStatsMergeResponse, taskNum) + for i := 0; i < mergeConcurrency; i++ { + worker := statistics.NewTopnStatsMergeWorker(taskCh, respCh, wrapper) + wg.Run(func() { + worker.Run(timeZone, isIndex, n, version) + }) + } + for _, task := range tasks { + taskCh <- task + } + close(taskCh) + wg.Wait() + close(respCh) + resps := make([]*statistics.TopnStatsMergeResponse, 0) + + // handle Error + hasErr := false + for resp := range respCh { + if resp.Err != nil { + hasErr = true + } + resps = append(resps, resp) + } + if hasErr { + errMsg := make([]string, 0) + for _, resp := range resps { + if resp.Err != nil { + errMsg = append(errMsg, resp.Err.Error()) + } + } + return nil, nil, nil, errors.New(strings.Join(errMsg, ",")) + } + + // fetch the response from each worker and merge them into global topn stats + sorted := make([]statistics.TopNMeta, 0, mergeConcurrency) + leftTopn := make([]statistics.TopNMeta, 0) + for _, resp := range resps { + if resp.TopN != nil { + sorted = append(sorted, resp.TopN.TopN...) + } + leftTopn = append(leftTopn, resp.PopedTopn...) + for i, removeTopn := range resp.RemoveVals { + // Remove the value from the Hists. + if len(removeTopn) > 0 { + tmp := removeTopn + slices.SortFunc(tmp, func(i, j statistics.TopNMeta) bool { + cmpResult := bytes.Compare(i.Encoded, j.Encoded) + return cmpResult < 0 + }) + wrapper.AllHg[i].RemoveVals(tmp) + } + } + } + + globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n) + return globalTopN, statistics.SortTopnMeta(append(leftTopn, popedTopn...)), wrapper.AllHg, nil +} + func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) { if is.SchemaMetaVersion() != h.mu.schemaVersion { h.mu.schemaVersion = is.SchemaMetaVersion() diff --git a/statistics/merge_worker.go b/statistics/merge_worker.go new file mode 100644 index 0000000000000..ac34605835559 --- /dev/null +++ b/statistics/merge_worker.go @@ -0,0 +1,188 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "time" + + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/hack" +) + +// StatsWrapper wrapper stats +type StatsWrapper struct { + AllHg []*Histogram + AllTopN []*TopN +} + +// NewStatsWrapper returns wrapper +func NewStatsWrapper(hg []*Histogram, topN []*TopN) *StatsWrapper { + return &StatsWrapper{ + AllHg: hg, + AllTopN: topN, + } +} + +type topnStatsMergeWorker struct { + taskCh <-chan *TopnStatsMergeTask + respCh chan<- *TopnStatsMergeResponse + // the stats in the wrapper should only be read during the worker + statsWrapper *StatsWrapper +} + +// NewTopnStatsMergeWorker returns topn merge worker +func NewTopnStatsMergeWorker( + taskCh <-chan *TopnStatsMergeTask, + respCh chan<- *TopnStatsMergeResponse, + wrapper *StatsWrapper) *topnStatsMergeWorker { + worker := &topnStatsMergeWorker{ + taskCh: taskCh, + respCh: respCh, + } + worker.statsWrapper = wrapper + return worker +} + +// TopnStatsMergeTask indicates a task for merge topn stats +type TopnStatsMergeTask struct { + start int + end int +} + +// NewTopnStatsMergeTask returns task +func NewTopnStatsMergeTask(start, end int) *TopnStatsMergeTask { + return &TopnStatsMergeTask{ + start: start, + end: end, + } +} + +// TopnStatsMergeResponse indicates topn merge worker response +type TopnStatsMergeResponse struct { + TopN *TopN + PopedTopn []TopNMeta + RemoveVals [][]TopNMeta + Err error +} + +// Run runs topn merge like statistics.MergePartTopN2GlobalTopN +func (worker *topnStatsMergeWorker) Run(timeZone *time.Location, isIndex bool, + n uint32, + version int) { + for task := range worker.taskCh { + start := task.start + end := task.end + checkTopNs := worker.statsWrapper.AllTopN[start:end] + allTopNs := worker.statsWrapper.AllTopN + allHists := worker.statsWrapper.AllHg + resp := &TopnStatsMergeResponse{} + if checkEmptyTopNs(checkTopNs) { + worker.respCh <- resp + return + } + partNum := len(allTopNs) + checkNum := len(checkTopNs) + topNsNum := make([]int, checkNum) + removeVals := make([][]TopNMeta, partNum) + for i, topN := range checkTopNs { + if topN == nil { + topNsNum[i] = 0 + continue + } + topNsNum[i] = len(topN.TopN) + } + // Different TopN structures may hold the same value, we have to merge them. + counter := make(map[hack.MutableString]float64) + // datumMap is used to store the mapping from the string type to datum type. + // The datum is used to find the value in the histogram. + datumMap := make(map[hack.MutableString]types.Datum) + + for i, topN := range checkTopNs { + if topN.TotalCount() == 0 { + continue + } + for _, val := range topN.TopN { + encodedVal := hack.String(val.Encoded) + _, exists := counter[encodedVal] + counter[encodedVal] += float64(val.Count) + if exists { + // We have already calculated the encodedVal from the histogram, so just continue to next topN value. + continue + } + // We need to check whether the value corresponding to encodedVal is contained in other partition-level stats. + // 1. Check the topN first. + // 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram. + for j := 0; j < partNum; j++ { + if (j == i && version >= 2) || allTopNs[j].findTopN(val.Encoded) != -1 { + continue + } + // Get the encodedVal from the hists[j] + datum, exists := datumMap[encodedVal] + if !exists { + // If the datumMap does not have the encodedVal datum, + // we should generate the datum based on the encoded value. + // This part is copied from the function MergePartitionHist2GlobalHist. + var d types.Datum + if isIndex { + d.SetBytes(val.Encoded) + } else { + var err error + if types.IsTypeTime(allHists[0].Tp.GetType()) { + // handle datetime values specially since they are encoded to int and we'll get int values if using DecodeOne. + _, d, err = codec.DecodeAsDateTime(val.Encoded, allHists[0].Tp.GetType(), timeZone) + } else if types.IsTypeFloat(allHists[0].Tp.GetType()) { + _, d, err = codec.DecodeAsFloat32(val.Encoded, allHists[0].Tp.GetType()) + } else { + _, d, err = codec.DecodeOne(val.Encoded) + } + if err != nil { + resp.Err = err + worker.respCh <- resp + return + } + } + datumMap[encodedVal] = d + datum = d + } + // Get the row count which the value is equal to the encodedVal from histogram. + count, _ := allHists[j].equalRowCount(datum, isIndex) + if count != 0 { + counter[encodedVal] += count + // Remove the value corresponding to encodedVal from the histogram. + removeVals[j] = append(removeVals[j], TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)}) + } + } + } + } + // record remove values + resp.RemoveVals = removeVals + + numTop := len(counter) + if numTop == 0 { + worker.respCh <- resp + continue + } + sorted := make([]TopNMeta, 0, numTop) + for value, cnt := range counter { + data := hack.Slice(string(value)) + sorted = append(sorted, TopNMeta{Encoded: data, Count: uint64(cnt)}) + } + globalTopN, leftTopN := getMergedTopNFromSortedSlice(sorted, n) + resp.TopN = globalTopN + resp.PopedTopn = leftTopN + worker.respCh <- resp + } +}