Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support merge global topn in concurrency #38358

Merged
merged 15 commits into from
Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"fmt"
"io/ioutil"
"strconv"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -338,3 +339,78 @@ func TestAnalyzePartitionTableForFloat(t *testing.T) {
}
tk.MustExec("analyze table t1")
}

func TestAnalyzePartitionTableByConcurrencyInDynamic(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec("use test")
tk.MustExec("create table t(id int) partition by hash(id) partitions 4")
testcases := []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
// assert empty table
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'")
}

for i := 1; i <= 500; i++ {
for j := 1; j <= 20; j++ {
tk.MustExec(fmt.Sprintf("insert into t (id) values (%v)", j))
}
}
var expected [][]interface{}
for i := 1; i <= 20; i++ {
expected = append(expected, []interface{}{
strconv.FormatInt(int64(i), 10), "500",
})
}
testcases = []struct {
concurrency string
}{
{
concurrency: "1",
},
{
concurrency: "2",
},
{
concurrency: "3",
},
{
concurrency: "4",
},
{
concurrency: "5",
},
}
for _, tc := range testcases {
concurrency := tc.concurrency
fmt.Println("testcase ", concurrency)
tk.MustExec(fmt.Sprintf("set @@tidb_merge_partition_stats_concurrency=%v", concurrency))
tk.MustQuery("select @@tidb_merge_partition_stats_concurrency").Check(testkit.Rows(concurrency))
tk.MustExec("analyze table t")
tk.MustQuery("show stats_topn where partition_name = 'global' and table_name = 't'").CheckAt([]int{5, 6}, expected)
}
}
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,9 @@ type SessionVars struct {
// LastPlanReplayerToken indicates the last plan replayer token
LastPlanReplayerToken string

// AnalyzePartitionMergeConcurrency indicates concurrency for merging partition stats
AnalyzePartitionMergeConcurrency int

HookContext
}

Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,13 @@ var defaultSysVars = []*SysVar{
s.RangeMaxSize = TidbOptInt64(val, DefTiDBOptRangeMaxSize)
return nil
}},
{
Scope: ScopeGlobal | ScopeSession, Name: TiDBMergePartitionStatsConcurrency, Value: strconv.FormatInt(DefTiDBMergePartitionStatsConcurrency, 10), Type: TypeInt, MinValue: 1, MaxValue: MaxConfigurableConcurrency,
SetSession: func(s *SessionVars, val string) error {
s.AnalyzePartitionMergeConcurrency = TidbOptInt(val, DefTiDBMergePartitionStatsConcurrency)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ const (
// ranges would exceed the limit, it chooses less accurate ranges such as full range. 0 indicates that there is no memory
// limit for ranges.
TiDBOptRangeMaxSize = "tidb_opt_range_max_size"

// TiDBMergePartitionStatsConcurrency indicates the concurrecny when merge partition stats into global stats
TiDBMergePartitionStatsConcurrency = "tidb_merge_partition_stats_concurrency"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -1057,6 +1060,7 @@ const (
DefTiDBOptRangeMaxSize = 0
DefTiDBCostModelVer = 1
DefTiDBServerMemoryLimitSessMinSize = 128 << 20
DefTiDBMergePartitionStatsConcurrency = 1
DefTiDBServerMemoryLimitGCTrigger = 0.7
DefTiDBEnableGOGCTuner = true
)
Expand Down
21 changes: 19 additions & 2 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"reflect"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -729,7 +730,7 @@ func NewTopN(n int) *TopN {
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) {
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram, isIndex bool) (*TopN, []TopNMeta, []*Histogram, error) {
if checkEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
Expand Down Expand Up @@ -781,7 +782,7 @@ func MergePartTopN2GlobalTopN(sc *stmtctx.StatementContext, version int, topNs [
var err error
if types.IsTypeTime(hists[0].Tp.GetType()) {
// handle datetime values specially since they are encoded to int and we'll get int values if using DecodeOne.
_, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), sc.TimeZone)
_, d, err = codec.DecodeAsDateTime(val.Encoded, hists[0].Tp.GetType(), loc)
} else if types.IsTypeFloat(hists[0].Tp.GetType()) {
_, d, err = codec.DecodeAsFloat32(val.Encoded, hists[0].Tp.GetType())
} else {
Expand Down Expand Up @@ -866,6 +867,22 @@ func checkEmptyTopNs(topNs []*TopN) bool {
return count == 0
}

// SortTopnMeta sort topnMeta
func SortTopnMeta(topnMetas []TopNMeta) []TopNMeta {
slices.SortFunc(topnMetas, func(i, j TopNMeta) bool {
if i.Count != j.Count {
return i.Count > j.Count
}
return bytes.Compare(i.Encoded, j.Encoded) < 0
})
return topnMetas
}

// GetMergedTopNFromSortedSlice returns merged topn
func GetMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) {
return getMergedTopNFromSortedSlice(sorted, n)
}

func getMergedTopNFromSortedSlice(sorted []TopNMeta, n uint32) (*TopN, []TopNMeta) {
slices.SortFunc(sorted, func(i, j TopNMeta) bool {
if i.Count != j.Count {
Expand Down
112 changes: 108 additions & 4 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package handle

import (
"bytes"
"context"
"encoding/json"
"fmt"
Expand All @@ -28,7 +29,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl/util"
ddlUtil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
Expand All @@ -56,6 +58,9 @@ import (
const (
// TiDBGlobalStats represents the global-stats for a partitioned table.
TiDBGlobalStats = "global"

// maxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
maxPartitionMergeBatchSize = 256
Yisaer marked this conversation as resolved.
Show resolved Hide resolved
)

// Handle can update stats info periodically.
Expand Down Expand Up @@ -83,7 +88,7 @@ type Handle struct {

// ddlEventCh is a channel to notify a ddl operation has happened.
// It is sent only by owner or the drop stats executor, and read by stats handle.
ddlEventCh chan *util.Event
ddlEventCh chan *ddlUtil.Event
// listHead contains all the stats collector required by session.
listHead *SessionStatsCollector
// globalMap contains all the delta map from collectors when we dump them to KV.
Expand Down Expand Up @@ -197,7 +202,7 @@ type sessionPool interface {
func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tracker sessionctx.SysProcTracker, serverIDGetter func() uint64) (*Handle, error) {
cfg := config.GetGlobalConfig()
handle := &Handle{
ddlEventCh: make(chan *util.Event, 100),
ddlEventCh: make(chan *ddlUtil.Event, 100),
listHead: &SessionStatsCollector{mapper: make(tableDeltaMap), rateMap: make(errorRateDeltaMap)},
idxUsageListHead: &SessionIndexUsageCollector{mapper: make(indexUsageMap)},
pool: pool,
Expand Down Expand Up @@ -547,7 +552,8 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
// Because after merging TopN, some numbers will be left.
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
globalStats.TopN[i], popedTopN, allHg[i], err = statistics.MergePartTopN2GlobalTopN(sc.GetSessionVars().StmtCtx, sc.GetSessionVars().AnalyzeVersion, allTopN[i], uint32(opts[ast.AnalyzeOptNumTopN]), allHg[i], isIndex == 1)
wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i])
globalStats.TopN[i], popedTopN, allHg[i], err = h.mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
if err != nil {
return
}
Expand Down Expand Up @@ -579,6 +585,104 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
return
}

func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle the version at first? For example, if version == 1 just return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will statistics version affect this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only the version2 has the TopN. Maybe for the Version1, we don't need to merge topN?

timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency
// use original method if concurrency equals 1 or for version1
if mergeConcurrency < 2 {
return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex)
}
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
batchSize = 1
} else if batchSize > maxPartitionMergeBatchSize {
batchSize = maxPartitionMergeBatchSize
}
return h.mergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex)
}

// mergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
// the partition size for each worker to solve it
func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
[]statistics.TopNMeta, []*statistics.Histogram, error) {
if len(wrapper.AllTopN) < mergeConcurrency {
mergeConcurrency = len(wrapper.AllTopN)
}
tasks := make([]*statistics.TopnStatsMergeTask, 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the capacity can be Len(wrapper.AllTopN) + 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not necessary

for start := 0; start < len(wrapper.AllTopN); {
end := start + mergeBatchSize
if end > len(wrapper.AllTopN) {
end = len(wrapper.AllTopN)
}
task := statistics.NewTopnStatsMergeTask(start, end)
tasks = append(tasks, task)
start = end
}
var wg util.WaitGroupWrapper
taskNum := len(tasks)
taskCh := make(chan *statistics.TopnStatsMergeTask, taskNum)
respCh := make(chan *statistics.TopnStatsMergeResponse, taskNum)
for i := 0; i < mergeConcurrency; i++ {
worker := statistics.NewTopnStatsMergeWorker(taskCh, respCh, wrapper)
wg.Run(func() {
worker.Run(timeZone, isIndex, n, version)
})
}
for _, task := range tasks {
taskCh <- task
}
close(taskCh)
wg.Wait()
close(respCh)
resps := make([]*statistics.TopnStatsMergeResponse, 0)

// handle Error
hasErr := false
for resp := range respCh {
if resp.Err != nil {
hasErr = true
}
resps = append(resps, resp)
}
if hasErr {
errMsg := make([]string, 0)
for _, resp := range resps {
if resp.Err != nil {
errMsg = append(errMsg, resp.Err.Error())
}
}
return nil, nil, nil, errors.New(strings.Join(errMsg, ","))
}

// fetch the response from each worker and merge them into global topn stats
sorted := make([]statistics.TopNMeta, 0, mergeConcurrency)
leftTopn := make([]statistics.TopNMeta, 0)
for _, resp := range resps {
if resp.TopN != nil {
sorted = append(sorted, resp.TopN.TopN...)
}
leftTopn = append(leftTopn, resp.PopedTopn...)
for i, removeTopn := range resp.RemoveVals {
// Remove the value from the Hists.
if len(removeTopn) > 0 {
tmp := removeTopn
slices.SortFunc(tmp, func(i, j statistics.TopNMeta) bool {
cmpResult := bytes.Compare(i.Encoded, j.Encoded)
return cmpResult < 0
})
wrapper.AllHg[i].RemoveVals(tmp)
}
}
Comment on lines +669 to +679
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we gather all of the removeTopN for one partition from all resps. Remove them in histogram at once. Can this improve the histogram's accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I gather all the removed topn from response is to keep only reading stats during worker in order to avoid data race.

}

globalTopN, popedTopn := statistics.GetMergedTopNFromSortedSlice(sorted, n)
return globalTopN, statistics.SortTopnMeta(append(leftTopn, popedTopn...)), wrapper.AllHg, nil
}

func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) {
if is.SchemaMetaVersion() != h.mu.schemaVersion {
h.mu.schemaVersion = is.SchemaMetaVersion()
Expand Down
Loading