diff --git a/ddl/db_test.go b/ddl/db_test.go index e1347276637e4..ce2ea307e97af 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1763,15 +1763,3 @@ func TestDDLBlockedCreateView(t *testing.T) { dom.DDL().SetHook(hook) tk.MustExec("alter table t modify column a char(10)") } - -func TestMDLPutETCDFail(t *testing.T) { - store := testkit.CreateMockStore(t) - - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t(a int)") - - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/putEtcdFailed", `return(true)`)) - defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/putEtcdFailed")) - tk.MustExec("alter table t add column b int") -} diff --git a/ddl/mock.go b/ddl/mock.go index 9475a5c34ee2e..57a60794f514a 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -66,12 +66,6 @@ func (s *MockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {} // UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface. func (s *MockSchemaSyncer) UpdateSelfVersion(ctx context.Context, jobID int64, version int64) error { - failpoint.Inject("putEtcdFailed", func() { - if mockDDLErrOnce < 3 { - mockDDLErrOnce++ - failpoint.Return(errors.New("mock putEtcdFailed")) - } - }) if variable.EnableMDL.Load() { s.mdlSchemaVersions.Store(jobID, version) } else { diff --git a/domain/domain.go b/domain/domain.go index 899ad458d78eb..66fcf3ca0e3b3 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -724,7 +724,6 @@ func (do *Domain) mdlCheckLoop() { err := do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), jobID, ver) if err != nil { logutil.BgLogger().Warn("update self version failed", zap.Error(err)) - jobNeedToSync = true } else { jobCache[jobID] = ver } diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 0968c629b4b0d..542ba5d5f963c 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -906,14 +906,10 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) joinSchema.Append(cols1...) } - joinKeys := make([]*expression.Column, 0, len(testCase.keyIdx)) - for _, keyIdx := range testCase.keyIdx { - joinKeys = append(joinKeys, cols0[keyIdx]) - } - probeKeys := make([]*expression.Column, 0, len(testCase.keyIdx)) - for _, keyIdx := range testCase.keyIdx { - probeKeys = append(probeKeys, cols1[keyIdx]) - } + joinKeysColIdx := make([]int, 0, len(testCase.keyIdx)) + joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...) + probeKeysColIdx := make([]int, 0, len(testCase.keyIdx)) + probeKeysColIdx = append(probeKeysColIdx, testCase.keyIdx...) e := &HashJoinExec{ baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec), hashJoinCtx: &hashJoinCtx{ @@ -921,26 +917,31 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor) isOuterJoin: false, useOuterToBuild: testCase.useOuterToBuild, concurrency: uint(testCase.concurrency), + probeTypes: retTypes(outerExec), + buildTypes: retTypes(innerExec), }, probeSideTupleFetcher: &probeSideTupleFetcher{ probeSideExec: outerExec, }, - probeWorkers: make([]probeWorker, testCase.concurrency), - buildKeys: joinKeys, - probeKeys: probeKeys, - buildSideExec: innerExec, - buildSideEstCount: float64(testCase.rows), + probeWorkers: make([]*probeWorker, testCase.concurrency), + buildWorker: &buildWorker{ + buildKeyColIdx: joinKeysColIdx, + buildSideExec: innerExec, + }, } childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema()) - defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len()) + defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len()) lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec) for i := uint(0); i < e.concurrency; i++ { - e.probeWorkers[i].workerID = i - e.probeWorkers[i].sessCtx = e.ctx - e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx - e.probeWorkers[i].joiner = newJoiner(testCase.ctx, e.joinType, true, defaultValues, - nil, lhsTypes, rhsTypes, childrenUsedSchema, false) + e.probeWorkers[i] = &probeWorker{ + workerID: i, + sessCtx: e.ctx, + hashJoinCtx: e.hashJoinCtx, + joiner: newJoiner(testCase.ctx, e.joinType, true, defaultValues, + nil, lhsTypes, rhsTypes, childrenUsedSchema, false), + probeKeyColIdx: probeKeysColIdx, + } } memLimit := int64(-1) if testCase.disk { diff --git a/executor/builder.go b/executor/builder.go index eb8825dd0ce99..3d015849aa5ef 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1400,17 +1400,6 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu return e } -func (b *executorBuilder) buildSideEstCount(v *plannercore.PhysicalHashJoin) float64 { - buildSide := v.Children()[v.InnerChildIdx] - if v.UseOuterToBuild { - buildSide = v.Children()[1-v.InnerChildIdx] - } - if buildSide.Stats().HistColl == nil || buildSide.Stats().HistColl.Pseudo { - return 0.0 - } - return buildSide.StatsCount() -} - func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executor { leftExec := b.build(v.Children()[0]) if b.err != nil { @@ -1425,6 +1414,8 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo e := &HashJoinExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec), probeSideTupleFetcher: &probeSideTupleFetcher{}, + probeWorkers: make([]*probeWorker, v.Concurrency), + buildWorker: &buildWorker{}, hashJoinCtx: &hashJoinCtx{ isOuterJoin: v.JoinType.IsOuterJoin(), useOuterToBuild: v.UseOuterToBuild, @@ -1449,15 +1440,17 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo leftIsBuildSide := true e.isNullEQ = v.IsNullEQ + var probeKeys, probeNAKeys, buildKeys, buildNAKeys []*expression.Column + var buildSideExec Executor if v.UseOuterToBuild { // update the buildSideEstCount due to changing the build side if v.InnerChildIdx == 1 { - e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.LeftConditions } else { - e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.RightConditions leftIsBuildSide = false } @@ -1466,30 +1459,48 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo } } else { if v.InnerChildIdx == 0 { - e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys e.outerFilter = v.RightConditions } else { - e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys - e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys + buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys + e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys e.outerFilter = v.LeftConditions leftIsBuildSide = false } if defaultValues == nil { - defaultValues = make([]types.Datum, e.buildSideExec.Schema().Len()) + defaultValues = make([]types.Datum, buildSideExec.Schema().Len()) } } + probeKeyColIdx := make([]int, len(probeKeys)) + probeNAKeColIdx := make([]int, len(probeNAKeys)) + buildKeyColIdx := make([]int, len(buildKeys)) + buildNAKeyColIdx := make([]int, len(buildNAKeys)) + for i := range buildKeys { + buildKeyColIdx[i] = buildKeys[i].Index + } + for i := range buildNAKeys { + buildNAKeyColIdx[i] = buildNAKeys[i].Index + } + for i := range probeKeys { + probeKeyColIdx[i] = probeKeys[i].Index + } + for i := range probeNAKeys { + probeNAKeColIdx[i] = probeNAKeys[i].Index + } isNAJoin := len(v.LeftNAJoinKeys) > 0 - e.buildSideEstCount = b.buildSideEstCount(v) childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema()) - e.probeWorkers = make([]probeWorker, e.concurrency) for i := uint(0); i < e.concurrency; i++ { - e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx - e.probeWorkers[i].workerID = i - e.probeWorkers[i].sessCtx = e.ctx - e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, - v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin) + e.probeWorkers[i] = &probeWorker{ + hashJoinCtx: e.hashJoinCtx, + workerID: i, + sessCtx: e.ctx, + joiner: newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin), + probeKeyColIdx: probeKeyColIdx, + probeNAKeyColIdx: probeNAKeColIdx, + } } + e.buildWorker.buildKeyColIdx, e.buildWorker.buildNAKeyColIdx, e.buildWorker.buildSideExec = buildKeyColIdx, buildNAKeyColIdx, buildSideExec e.hashJoinCtx.isNullAware = isNAJoin executorCountHashJoinExec.Inc() diff --git a/executor/hash_table.go b/executor/hash_table.go index b7c875148bffa..2ba840d04fdc9 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -117,7 +117,7 @@ type hashRowContainer struct { chkBuf *chunk.Chunk } -func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { +func newHashRowContainer(sCtx sessionctx.Context, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { maxChunkSize := sCtx.GetSessionVars().MaxChunkSize rc := chunk.NewRowContainer(allTypes, maxChunkSize) c := &hashRowContainer{ diff --git a/executor/hash_table_test.go b/executor/hash_table_test.go index 3b4a4acee5284..0a387e0e7e5b6 100644 --- a/executor/hash_table_test.go +++ b/executor/hash_table_test.go @@ -127,7 +127,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool) for i := 0; i < numRows; i++ { hCtx.hashVals = append(hCtx.hashVals, hashFunc()) } - rowContainer := newHashRowContainer(sctx, 0, hCtx, colTypes) + rowContainer := newHashRowContainer(sctx, hCtx, colTypes) copiedRC = rowContainer.ShallowCopy() tracker := rowContainer.GetMemTracker() tracker.SetLabel(memory.LabelForBuildSideResult) diff --git a/executor/join.go b/executor/join.go index 1c75252a5e876..bcab7d9a4ea60 100644 --- a/executor/join.go +++ b/executor/join.go @@ -80,8 +80,10 @@ type probeSideTupleFetcher struct { type probeWorker struct { hashJoinCtx *hashJoinCtx sessCtx sessionctx.Context + workerID uint - workerID uint + probeKeyColIdx []int + probeNAKeyColIdx []int // We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently buildSideRows []chunk.Row buildSideRowPtrs []chunk.RowPtr @@ -99,20 +101,20 @@ type probeWorker struct { probeResultCh chan *chunk.Chunk } +type buildWorker struct { + buildSideExec Executor + buildKeyColIdx []int + buildNAKeyColIdx []int +} + // HashJoinExec implements the hash join algorithm. type HashJoinExec struct { baseExecutor *hashJoinCtx probeSideTupleFetcher *probeSideTupleFetcher - probeWorkers []probeWorker - - buildSideExec Executor - buildSideEstCount float64 - probeKeys []*expression.Column - probeNAKeys []*expression.Column - buildKeys []*expression.Column - buildNAKeys []*expression.Column + probeWorkers []*probeWorker + buildWorker *buildWorker worker util.WaitGroupWrapper waiter util.WaitGroupWrapper @@ -204,12 +206,6 @@ func (e *HashJoinExec) Open(ctx context.Context) error { e.closeCh = make(chan struct{}) e.finished.Store(false) - if e.probeTypes == nil { - e.probeTypes = retTypes(e.probeSideTupleFetcher.probeSideExec) - } - if e.buildTypes == nil { - e.buildTypes = retTypes(e.buildSideExec) - } if e.runtimeStats != nil { e.stats = &hashJoinRuntimeStats{ concurrent: int(e.concurrency), @@ -311,8 +307,8 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu if e.finished.Load() { return } - chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) - err = Next(ctx, e.buildSideExec, chk) + chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildWorker.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool) + err = Next(ctx, e.buildWorker.buildSideExec, chk) if err != nil { errCh <- errors.Trace(err) return @@ -373,19 +369,11 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) { e.probeSideTupleFetcher.fetchProbeSideChunks(ctx, e.maxChunkSize) }, e.probeSideTupleFetcher.handleProbeSideFetcherPanic) - probeKeyColIdx := make([]int, len(e.probeKeys)) - probeNAKeColIdx := make([]int, len(e.probeNAKeys)) - for i := range e.probeKeys { - probeKeyColIdx[i] = e.probeKeys[i].Index - } - for i := range e.probeNAKeys { - probeNAKeColIdx[i] = e.probeNAKeys[i].Index - } for i := uint(0); i < e.concurrency; i++ { workerID := i e.worker.RunWithRecover(func() { defer trace.StartRegion(ctx, "HashJoinWorker").End() - e.probeWorkers[workerID].runJoinWorker(probeKeyColIdx, probeNAKeColIdx) + e.probeWorkers[workerID].runJoinWorker() }, e.probeWorkers[workerID].handleProbeWorkerPanic) } e.waiter.RunWithRecover(e.waitJoinWorkersAndCloseResultChan, nil) @@ -461,7 +449,7 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() { close(e.joinResultCh) } -func (w *probeWorker) runJoinWorker(probeKeyColIdx, probeNAKeyColIdx []int) { +func (w *probeWorker) runJoinWorker() { probeTime := int64(0) if w.hashJoinCtx.stats != nil { start := time.Now() @@ -488,8 +476,8 @@ func (w *probeWorker) runJoinWorker(probeKeyColIdx, probeNAKeyColIdx []int) { } hCtx := &hashContext{ allTypes: w.hashJoinCtx.probeTypes, - keyColIdx: probeKeyColIdx, - naKeyColIdx: probeNAKeyColIdx, + keyColIdx: w.probeKeyColIdx, + naKeyColIdx: w.probeNAKeyColIdx, } for ok := true; ok; { if w.hashJoinCtx.finished.Load() { @@ -1103,20 +1091,12 @@ func (w *probeWorker) join2ChunkForOuterHashJoin(probeSideChk *chunk.Chunk, hCtx func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { if !e.prepared { e.buildFinished = make(chan error, 1) - buildKeyColIdx := make([]int, len(e.buildKeys)) - for i := range e.buildKeys { - buildKeyColIdx[i] = e.buildKeys[i].Index - } - buildNAKeyColIdx := make([]int, len(e.buildNAKeys)) - for i := range e.buildNAKeys { - buildNAKeyColIdx[i] = e.buildNAKeys[i].Index - } hCtx := &hashContext{ allTypes: e.buildTypes, - keyColIdx: buildKeyColIdx, - naKeyColIdx: buildNAKeyColIdx, + keyColIdx: e.buildWorker.buildKeyColIdx, + naKeyColIdx: e.buildWorker.buildNAKeyColIdx, } - e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec)) + e.rowContainer = newHashRowContainer(e.ctx, hCtx, retTypes(e.buildWorker.buildSideExec)) // we shallow copies rowContainer for each probe worker to avoid lock contention for i := uint(0); i < e.concurrency; i++ { if i == 0 {