Skip to content

Commit

Permalink
executor: split hashjoin part5 (#39386)
Browse files Browse the repository at this point in the history
ref #39061
  • Loading branch information
XuHuaiyu authored Nov 29, 2022
1 parent d61b8c4 commit d0b72a2
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 107 deletions.
12 changes: 0 additions & 12 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,15 +1763,3 @@ func TestDDLBlockedCreateView(t *testing.T) {
dom.DDL().SetHook(hook)
tk.MustExec("alter table t modify column a char(10)")
}

func TestMDLPutETCDFail(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a int)")

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/putEtcdFailed", `return(true)`))
defer require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/putEtcdFailed"))
tk.MustExec("alter table t add column b int")
}
6 changes: 0 additions & 6 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ func (s *MockSchemaSyncer) WatchGlobalSchemaVer(context.Context) {}

// UpdateSelfVersion implements SchemaSyncer.UpdateSelfVersion interface.
func (s *MockSchemaSyncer) UpdateSelfVersion(ctx context.Context, jobID int64, version int64) error {
failpoint.Inject("putEtcdFailed", func() {
if mockDDLErrOnce < 3 {
mockDDLErrOnce++
failpoint.Return(errors.New("mock putEtcdFailed"))
}
})
if variable.EnableMDL.Load() {
s.mdlSchemaVersions.Store(jobID, version)
} else {
Expand Down
1 change: 0 additions & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ func (do *Domain) mdlCheckLoop() {
err := do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), jobID, ver)
if err != nil {
logutil.BgLogger().Warn("update self version failed", zap.Error(err))
jobNeedToSync = true
} else {
jobCache[jobID] = ver
}
Expand Down
39 changes: 20 additions & 19 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,41 +906,42 @@ func prepare4HashJoin(testCase *hashJoinTestCase, innerExec, outerExec Executor)
joinSchema.Append(cols1...)
}

joinKeys := make([]*expression.Column, 0, len(testCase.keyIdx))
for _, keyIdx := range testCase.keyIdx {
joinKeys = append(joinKeys, cols0[keyIdx])
}
probeKeys := make([]*expression.Column, 0, len(testCase.keyIdx))
for _, keyIdx := range testCase.keyIdx {
probeKeys = append(probeKeys, cols1[keyIdx])
}
joinKeysColIdx := make([]int, 0, len(testCase.keyIdx))
joinKeysColIdx = append(joinKeysColIdx, testCase.keyIdx...)
probeKeysColIdx := make([]int, 0, len(testCase.keyIdx))
probeKeysColIdx = append(probeKeysColIdx, testCase.keyIdx...)
e := &HashJoinExec{
baseExecutor: newBaseExecutor(testCase.ctx, joinSchema, 5, innerExec, outerExec),
hashJoinCtx: &hashJoinCtx{
joinType: testCase.joinType, // 0 for InnerJoin, 1 for LeftOutersJoin, 2 for RightOuterJoin
isOuterJoin: false,
useOuterToBuild: testCase.useOuterToBuild,
concurrency: uint(testCase.concurrency),
probeTypes: retTypes(outerExec),
buildTypes: retTypes(innerExec),
},
probeSideTupleFetcher: &probeSideTupleFetcher{
probeSideExec: outerExec,
},
probeWorkers: make([]probeWorker, testCase.concurrency),
buildKeys: joinKeys,
probeKeys: probeKeys,
buildSideExec: innerExec,
buildSideEstCount: float64(testCase.rows),
probeWorkers: make([]*probeWorker, testCase.concurrency),
buildWorker: &buildWorker{
buildKeyColIdx: joinKeysColIdx,
buildSideExec: innerExec,
},
}

childrenUsedSchema := markChildrenUsedCols(e.Schema(), e.children[0].Schema(), e.children[1].Schema())
defaultValues := make([]types.Datum, e.buildSideExec.Schema().Len())
defaultValues := make([]types.Datum, e.buildWorker.buildSideExec.Schema().Len())
lhsTypes, rhsTypes := retTypes(innerExec), retTypes(outerExec)
for i := uint(0); i < e.concurrency; i++ {
e.probeWorkers[i].workerID = i
e.probeWorkers[i].sessCtx = e.ctx
e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx
e.probeWorkers[i].joiner = newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes, childrenUsedSchema, false)
e.probeWorkers[i] = &probeWorker{
workerID: i,
sessCtx: e.ctx,
hashJoinCtx: e.hashJoinCtx,
joiner: newJoiner(testCase.ctx, e.joinType, true, defaultValues,
nil, lhsTypes, rhsTypes, childrenUsedSchema, false),
probeKeyColIdx: probeKeysColIdx,
}
}
memLimit := int64(-1)
if testCase.disk {
Expand Down
65 changes: 38 additions & 27 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,17 +1400,6 @@ func (b *executorBuilder) buildMergeJoin(v *plannercore.PhysicalMergeJoin) Execu
return e
}

func (b *executorBuilder) buildSideEstCount(v *plannercore.PhysicalHashJoin) float64 {
buildSide := v.Children()[v.InnerChildIdx]
if v.UseOuterToBuild {
buildSide = v.Children()[1-v.InnerChildIdx]
}
if buildSide.Stats().HistColl == nil || buildSide.Stats().HistColl.Pseudo {
return 0.0
}
return buildSide.StatsCount()
}

func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executor {
leftExec := b.build(v.Children()[0])
if b.err != nil {
Expand All @@ -1425,6 +1414,8 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
e := &HashJoinExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec),
probeSideTupleFetcher: &probeSideTupleFetcher{},
probeWorkers: make([]*probeWorker, v.Concurrency),
buildWorker: &buildWorker{},
hashJoinCtx: &hashJoinCtx{
isOuterJoin: v.JoinType.IsOuterJoin(),
useOuterToBuild: v.UseOuterToBuild,
Expand All @@ -1449,15 +1440,17 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
leftIsBuildSide := true

e.isNullEQ = v.IsNullEQ
var probeKeys, probeNAKeys, buildKeys, buildNAKeys []*expression.Column
var buildSideExec Executor
if v.UseOuterToBuild {
// update the buildSideEstCount due to changing the build side
if v.InnerChildIdx == 1 {
e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.outerFilter = v.LeftConditions
} else {
e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.outerFilter = v.RightConditions
leftIsBuildSide = false
}
Expand All @@ -1466,30 +1459,48 @@ func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executo
}
} else {
if v.InnerChildIdx == 0 {
e.buildSideExec, e.buildKeys, e.buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
buildSideExec, buildKeys, buildNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.outerFilter = v.RightConditions
} else {
e.buildSideExec, e.buildKeys, e.buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, e.probeKeys, e.probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
buildSideExec, buildKeys, buildNAKeys = rightExec, v.RightJoinKeys, v.RightNAJoinKeys
e.probeSideTupleFetcher.probeSideExec, probeKeys, probeNAKeys = leftExec, v.LeftJoinKeys, v.LeftNAJoinKeys
e.outerFilter = v.LeftConditions
leftIsBuildSide = false
}
if defaultValues == nil {
defaultValues = make([]types.Datum, e.buildSideExec.Schema().Len())
defaultValues = make([]types.Datum, buildSideExec.Schema().Len())
}
}
probeKeyColIdx := make([]int, len(probeKeys))
probeNAKeColIdx := make([]int, len(probeNAKeys))
buildKeyColIdx := make([]int, len(buildKeys))
buildNAKeyColIdx := make([]int, len(buildNAKeys))
for i := range buildKeys {
buildKeyColIdx[i] = buildKeys[i].Index
}
for i := range buildNAKeys {
buildNAKeyColIdx[i] = buildNAKeys[i].Index
}
for i := range probeKeys {
probeKeyColIdx[i] = probeKeys[i].Index
}
for i := range probeNAKeys {
probeNAKeColIdx[i] = probeNAKeys[i].Index
}
isNAJoin := len(v.LeftNAJoinKeys) > 0
e.buildSideEstCount = b.buildSideEstCount(v)
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
e.probeWorkers = make([]probeWorker, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.probeWorkers[i].hashJoinCtx = e.hashJoinCtx
e.probeWorkers[i].workerID = i
e.probeWorkers[i].sessCtx = e.ctx
e.probeWorkers[i].joiner = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin)
e.probeWorkers[i] = &probeWorker{
hashJoinCtx: e.hashJoinCtx,
workerID: i,
sessCtx: e.ctx,
joiner: newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues, v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema, isNAJoin),
probeKeyColIdx: probeKeyColIdx,
probeNAKeyColIdx: probeNAKeColIdx,
}
}
e.buildWorker.buildKeyColIdx, e.buildWorker.buildNAKeyColIdx, e.buildWorker.buildSideExec = buildKeyColIdx, buildNAKeyColIdx, buildSideExec
e.hashJoinCtx.isNullAware = isNAJoin
executorCountHashJoinExec.Inc()

Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type hashRowContainer struct {
chkBuf *chunk.Chunk
}

func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
func newHashRowContainer(sCtx sessionctx.Context, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
maxChunkSize := sCtx.GetSessionVars().MaxChunkSize
rc := chunk.NewRowContainer(allTypes, maxChunkSize)
c := &hashRowContainer{
Expand Down
2 changes: 1 addition & 1 deletion executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool)
for i := 0; i < numRows; i++ {
hCtx.hashVals = append(hCtx.hashVals, hashFunc())
}
rowContainer := newHashRowContainer(sctx, 0, hCtx, colTypes)
rowContainer := newHashRowContainer(sctx, hCtx, colTypes)
copiedRC = rowContainer.ShallowCopy()
tracker := rowContainer.GetMemTracker()
tracker.SetLabel(memory.LabelForBuildSideResult)
Expand Down
60 changes: 20 additions & 40 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ type probeSideTupleFetcher struct {
type probeWorker struct {
hashJoinCtx *hashJoinCtx
sessCtx sessionctx.Context
workerID uint

workerID uint
probeKeyColIdx []int
probeNAKeyColIdx []int
// We pre-alloc and reuse the Rows and RowPtrs for each probe goroutine, to avoid allocation frequently
buildSideRows []chunk.Row
buildSideRowPtrs []chunk.RowPtr
Expand All @@ -99,20 +101,20 @@ type probeWorker struct {
probeResultCh chan *chunk.Chunk
}

type buildWorker struct {
buildSideExec Executor
buildKeyColIdx []int
buildNAKeyColIdx []int
}

// HashJoinExec implements the hash join algorithm.
type HashJoinExec struct {
baseExecutor
*hashJoinCtx

probeSideTupleFetcher *probeSideTupleFetcher
probeWorkers []probeWorker

buildSideExec Executor
buildSideEstCount float64
probeKeys []*expression.Column
probeNAKeys []*expression.Column
buildKeys []*expression.Column
buildNAKeys []*expression.Column
probeWorkers []*probeWorker
buildWorker *buildWorker

worker util.WaitGroupWrapper
waiter util.WaitGroupWrapper
Expand Down Expand Up @@ -204,12 +206,6 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
e.closeCh = make(chan struct{})
e.finished.Store(false)

if e.probeTypes == nil {
e.probeTypes = retTypes(e.probeSideTupleFetcher.probeSideExec)
}
if e.buildTypes == nil {
e.buildTypes = retTypes(e.buildSideExec)
}
if e.runtimeStats != nil {
e.stats = &hashJoinRuntimeStats{
concurrent: int(e.concurrency),
Expand Down Expand Up @@ -311,8 +307,8 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu
if e.finished.Load() {
return
}
chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool)
err = Next(ctx, e.buildSideExec, chk)
chk := e.ctx.GetSessionVars().GetNewChunkWithCapacity(e.buildWorker.buildSideExec.base().retFieldTypes, e.ctx.GetSessionVars().MaxChunkSize, e.ctx.GetSessionVars().MaxChunkSize, e.AllocPool)
err = Next(ctx, e.buildWorker.buildSideExec, chk)
if err != nil {
errCh <- errors.Trace(err)
return
Expand Down Expand Up @@ -373,19 +369,11 @@ func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) {
e.probeSideTupleFetcher.fetchProbeSideChunks(ctx, e.maxChunkSize)
}, e.probeSideTupleFetcher.handleProbeSideFetcherPanic)

probeKeyColIdx := make([]int, len(e.probeKeys))
probeNAKeColIdx := make([]int, len(e.probeNAKeys))
for i := range e.probeKeys {
probeKeyColIdx[i] = e.probeKeys[i].Index
}
for i := range e.probeNAKeys {
probeNAKeColIdx[i] = e.probeNAKeys[i].Index
}
for i := uint(0); i < e.concurrency; i++ {
workerID := i
e.worker.RunWithRecover(func() {
defer trace.StartRegion(ctx, "HashJoinWorker").End()
e.probeWorkers[workerID].runJoinWorker(probeKeyColIdx, probeNAKeColIdx)
e.probeWorkers[workerID].runJoinWorker()
}, e.probeWorkers[workerID].handleProbeWorkerPanic)
}
e.waiter.RunWithRecover(e.waitJoinWorkersAndCloseResultChan, nil)
Expand Down Expand Up @@ -461,7 +449,7 @@ func (e *HashJoinExec) waitJoinWorkersAndCloseResultChan() {
close(e.joinResultCh)
}

func (w *probeWorker) runJoinWorker(probeKeyColIdx, probeNAKeyColIdx []int) {
func (w *probeWorker) runJoinWorker() {
probeTime := int64(0)
if w.hashJoinCtx.stats != nil {
start := time.Now()
Expand All @@ -488,8 +476,8 @@ func (w *probeWorker) runJoinWorker(probeKeyColIdx, probeNAKeyColIdx []int) {
}
hCtx := &hashContext{
allTypes: w.hashJoinCtx.probeTypes,
keyColIdx: probeKeyColIdx,
naKeyColIdx: probeNAKeyColIdx,
keyColIdx: w.probeKeyColIdx,
naKeyColIdx: w.probeNAKeyColIdx,
}
for ok := true; ok; {
if w.hashJoinCtx.finished.Load() {
Expand Down Expand Up @@ -1103,20 +1091,12 @@ func (w *probeWorker) join2ChunkForOuterHashJoin(probeSideChk *chunk.Chunk, hCtx
func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if !e.prepared {
e.buildFinished = make(chan error, 1)
buildKeyColIdx := make([]int, len(e.buildKeys))
for i := range e.buildKeys {
buildKeyColIdx[i] = e.buildKeys[i].Index
}
buildNAKeyColIdx := make([]int, len(e.buildNAKeys))
for i := range e.buildNAKeys {
buildNAKeyColIdx[i] = e.buildNAKeys[i].Index
}
hCtx := &hashContext{
allTypes: e.buildTypes,
keyColIdx: buildKeyColIdx,
naKeyColIdx: buildNAKeyColIdx,
keyColIdx: e.buildWorker.buildKeyColIdx,
naKeyColIdx: e.buildWorker.buildNAKeyColIdx,
}
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx, retTypes(e.buildSideExec))
e.rowContainer = newHashRowContainer(e.ctx, hCtx, retTypes(e.buildWorker.buildSideExec))
// we shallow copies rowContainer for each probe worker to avoid lock contention
for i := uint(0); i < e.concurrency; i++ {
if i == 0 {
Expand Down

0 comments on commit d0b72a2

Please sign in to comment.