From 0fa5328077c3e78bc96d28ff201599510aa4ab86 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 25 Oct 2023 14:30:09 +0800 Subject: [PATCH 1/4] executor: improve channel length for analyze Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 3 +-- pkg/executor/analyze_col_v2.go | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index ddfff2f77519c..df50d74889556 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -108,8 +108,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error { // Start workers with channel to collect results. taskCh := make(chan *analyzeTask, concurrency) - resultChLen := min(concurrency*2, len(tasks)) - resultsCh := make(chan *statistics.AnalyzeResults, resultChLen) + resultsCh := make(chan *statistics.AnalyzeResults) for i := 0; i < concurrency; i++ { e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) }) } diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 40a96efec1afa..6edddfaee8cc1 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -135,7 +135,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes e.memTracker.Release(e.memTracker.BytesConsumed()) return &statistics.AnalyzeResults{Err: err, Job: e.job} } - idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1) + idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult) // subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would // report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test // case with `-race` flag now. @@ -275,8 +275,8 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( sc := e.ctx.GetSessionVars().StmtCtx // Start workers to merge the result from collectors. - mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency) - mergeTaskCh := make(chan []byte, samplingStatsConcurrency) + mergeResultCh := make(chan *samplingMergeResult) + mergeTaskCh := make(chan []byte) var taskEg errgroup.Group // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) { @@ -381,7 +381,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( hists = make([]*statistics.Histogram, totalLen) topns = make([]*statistics.TopN, totalLen) fmSketches = make([]*statistics.FMSketch, 0, totalLen) - buildResultChan := make(chan error, totalLen) + buildResultChan := make(chan error) buildTaskChan := make(chan *samplingBuildTask, totalLen) if totalLen < samplingStatsConcurrency { samplingStatsConcurrency = totalLen From 0732f6f35607b27db6c0953198fbfb5c132778de Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 25 Oct 2023 19:37:01 +0800 Subject: [PATCH 2/4] executor: improve channel length for analyze Signed-off-by: Weizhen Wang --- pkg/executor/analyze_col_v2.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 6edddfaee8cc1..40a96efec1afa 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -135,7 +135,7 @@ func (e *AnalyzeColumnsExecV2) analyzeColumnsPushDownV2() *statistics.AnalyzeRes e.memTracker.Release(e.memTracker.BytesConsumed()) return &statistics.AnalyzeResults{Err: err, Job: e.job} } - idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult) + idxNDVPushDownCh := make(chan analyzeIndexNDVTotalResult, 1) // subIndexWorkerWg is better to be initialized in handleNDVForSpecialIndexes, however if we do so, golang would // report unexpected/unreasonable data race error on subIndexWorkerWg when running TestAnalyzeVirtualCol test // case with `-race` flag now. @@ -275,8 +275,8 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( sc := e.ctx.GetSessionVars().StmtCtx // Start workers to merge the result from collectors. - mergeResultCh := make(chan *samplingMergeResult) - mergeTaskCh := make(chan []byte) + mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency) + mergeTaskCh := make(chan []byte, samplingStatsConcurrency) var taskEg errgroup.Group // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) { @@ -381,7 +381,7 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( hists = make([]*statistics.Histogram, totalLen) topns = make([]*statistics.TopN, totalLen) fmSketches = make([]*statistics.FMSketch, 0, totalLen) - buildResultChan := make(chan error) + buildResultChan := make(chan error, totalLen) buildTaskChan := make(chan *samplingBuildTask, totalLen) if totalLen < samplingStatsConcurrency { samplingStatsConcurrency = totalLen From 79fd012a539c7194a87dd12fa5ad9cc441ac1865 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 25 Oct 2023 20:20:38 +0800 Subject: [PATCH 3/4] * Signed-off-by: Weizhen Wang --- pkg/executor/analyze_col_v2.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 40a96efec1afa..66deada16368d 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -275,8 +275,8 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( sc := e.ctx.GetSessionVars().StmtCtx // Start workers to merge the result from collectors. - mergeResultCh := make(chan *samplingMergeResult, samplingStatsConcurrency) - mergeTaskCh := make(chan []byte, samplingStatsConcurrency) + mergeResultCh := make(chan *samplingMergeResult) + mergeTaskCh := make(chan []byte) var taskEg errgroup.Group // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) { From 34a2c0a83bf84702d2983552ac303778f5b8630b Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 26 Oct 2023 11:40:12 +0800 Subject: [PATCH 4/4] * Signed-off-by: Weizhen Wang --- pkg/executor/analyze.go | 2 +- pkg/executor/analyze_col_v2.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/executor/analyze.go b/pkg/executor/analyze.go index df50d74889556..26acb9ce56bca 100644 --- a/pkg/executor/analyze.go +++ b/pkg/executor/analyze.go @@ -108,7 +108,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error { // Start workers with channel to collect results. taskCh := make(chan *analyzeTask, concurrency) - resultsCh := make(chan *statistics.AnalyzeResults) + resultsCh := make(chan *statistics.AnalyzeResults, 1) for i := 0; i < concurrency; i++ { e.wg.Run(func() { e.analyzeWorker(taskCh, resultsCh) }) } diff --git a/pkg/executor/analyze_col_v2.go b/pkg/executor/analyze_col_v2.go index 66deada16368d..6faa8e0f529a4 100644 --- a/pkg/executor/analyze_col_v2.go +++ b/pkg/executor/analyze_col_v2.go @@ -275,8 +275,8 @@ func (e *AnalyzeColumnsExecV2) buildSamplingStats( sc := e.ctx.GetSessionVars().StmtCtx // Start workers to merge the result from collectors. - mergeResultCh := make(chan *samplingMergeResult) - mergeTaskCh := make(chan []byte) + mergeResultCh := make(chan *samplingMergeResult, 1) + mergeTaskCh := make(chan []byte, 1) var taskEg errgroup.Group // Start read data from resultHandler and send them to mergeTaskCh. taskEg.Go(func() (err error) {