diff --git a/pkg/executor/join/base_join_probe.go b/pkg/executor/join/base_join_probe.go index 7088122d05e73..8b57a19826390 100644 --- a/pkg/executor/join/base_join_probe.go +++ b/pkg/executor/join/base_join_probe.go @@ -86,12 +86,6 @@ type matchedRowInfo struct { buildRowOffset int } -func createMatchRowInfo(probeRowIndex int, buildRowStart unsafe.Pointer) *matchedRowInfo { - ret := &matchedRowInfo{probeRowIndex: probeRowIndex} - *(*unsafe.Pointer)(unsafe.Pointer(&ret.buildRowStart)) = buildRowStart - return ret -} - type posAndHashValue struct { hashValue uint64 pos int @@ -116,7 +110,8 @@ type baseJoinProbe struct { currentProbeRow int matchedRowsForCurrentProbeRow int chunkRows int - cachedBuildRows []*matchedRowInfo + cachedBuildRows []matchedRowInfo + nextCachedBuildRowIndex int keyIndex []int keyTypes []*types.FieldType @@ -133,7 +128,7 @@ type baseJoinProbe struct { offsetAndLengthArray []offsetAndLength // these 3 variables are used for join that has other condition, should be inited when the join has other condition tmpChk *chunk.Chunk - rowIndexInfos []*matchedRowInfo + rowIndexInfos []matchedRowInfo selected []bool probeCollision uint64 @@ -152,7 +147,7 @@ func (j *baseJoinProbe) IsCurrentChunkProbeDone() bool { } func (j *baseJoinProbe) finishCurrentLookupLoop(joinedChk *chunk.Chunk) { - if len(j.cachedBuildRows) > 0 { + if j.nextCachedBuildRowIndex > 0 { j.batchConstructBuildRows(joinedChk, 0, j.ctx.hasOtherCondition()) } j.finishLookupCurrentProbeRow() @@ -278,9 +273,20 @@ func checkSQLKiller(killer *sqlkiller.SQLKiller, fpName string) error { return err } -func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(buildRow *matchedRowInfo, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) { - j.cachedBuildRows = append(j.cachedBuildRows, buildRow) - if len(j.cachedBuildRows) >= batchBuildRowSize { +func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsV2(rowInfo *matchedRowInfo, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) { + j.cachedBuildRows[j.nextCachedBuildRowIndex] = *rowInfo + j.nextCachedBuildRowIndex++ + if j.nextCachedBuildRowIndex == batchBuildRowSize { + j.batchConstructBuildRows(chk, currentColumnIndexInRow, forOtherCondition) + } +} + +func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsV1(probeRowIndex int, buildRowStart unsafe.Pointer, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) { + j.cachedBuildRows[j.nextCachedBuildRowIndex].probeRowIndex = probeRowIndex + j.cachedBuildRows[j.nextCachedBuildRowIndex].buildRowOffset = 0 + *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[j.nextCachedBuildRowIndex].buildRowStart)) = buildRowStart + j.nextCachedBuildRowIndex++ + if j.nextCachedBuildRowIndex == batchBuildRowSize { j.batchConstructBuildRows(chk, currentColumnIndexInRow, forOtherCondition) } } @@ -290,12 +296,12 @@ func (j *baseJoinProbe) batchConstructBuildRows(chk *chunk.Chunk, currentColumnI if forOtherCondition { j.rowIndexInfos = append(j.rowIndexInfos, j.cachedBuildRows...) } - j.cachedBuildRows = j.cachedBuildRows[:0] + j.nextCachedBuildRowIndex = 0 } func (j *baseJoinProbe) prepareForProbe(chk *chunk.Chunk) (joinedChk *chunk.Chunk, remainCap int, err error) { j.offsetAndLengthArray = j.offsetAndLengthArray[:0] - j.cachedBuildRows = j.cachedBuildRows[:0] + j.nextCachedBuildRowIndex = 0 j.matchedRowsForCurrentProbeRow = 0 joinedChk = chk if j.ctx.OtherCondition != nil { @@ -326,15 +332,15 @@ func (j *baseJoinProbe) appendBuildRowToChunk(chk *chunk.Chunk, currentColumnInd func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols []int, forOtherCondition bool, colOffset int, currentColumnInRow int) { chkRows := chk.NumRows() needUpdateVirtualRow := currentColumnInRow == 0 - if len(usedCols) == 0 || len(j.cachedBuildRows) == 0 { + if len(usedCols) == 0 || j.nextCachedBuildRowIndex == 0 { if needUpdateVirtualRow { - chk.SetNumVirtualRows(chkRows + len(j.cachedBuildRows)) + chk.SetNumVirtualRows(chkRows + j.nextCachedBuildRowIndex) } return } - for i := 0; i < len(j.cachedBuildRows); i++ { + for i := 0; i < j.nextCachedBuildRowIndex; i++ { if j.cachedBuildRows[i].buildRowOffset == 0 { - j.ctx.hashTableMeta.advanceToRowData(j.cachedBuildRows[i]) + j.ctx.hashTableMeta.advanceToRowData(&j.cachedBuildRows[i]) } } colIndexMap := make(map[int]int) @@ -366,12 +372,12 @@ func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols currentColumn = chk.Column(indexInDstChk) readNullMapThreadSafe := meta.isReadNullMapThreadSafe(columnIndex) if readNullMapThreadSafe { - for index := range j.cachedBuildRows { + for index := 0; index < j.nextCachedBuildRowIndex; index++ { currentColumn.AppendNullBitmap(!meta.isColumnNull(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), columnIndex)) j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset) } } else { - for index := range j.cachedBuildRows { + for index := 0; index < j.nextCachedBuildRowIndex; index++ { currentColumn.AppendNullBitmap(!meta.isColumnNullThreadSafe(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), columnIndex)) j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset) } @@ -379,19 +385,19 @@ func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols } else { // not used so don't need to insert into chk, but still need to advance rowData if meta.columnsSize[columnIndex] < 0 { - for index := range j.cachedBuildRows { + for index := 0; index < j.nextCachedBuildRowIndex; index++ { size := *(*uint64)(unsafe.Add(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset)) j.cachedBuildRows[index].buildRowOffset += sizeOfLengthField + int(size) } } else { - for index := range j.cachedBuildRows { + for index := 0; index < j.nextCachedBuildRowIndex; index++ { j.cachedBuildRows[index].buildRowOffset += meta.columnsSize[columnIndex] } } } } if needUpdateVirtualRow { - chk.SetNumVirtualRows(chkRows + len(j.cachedBuildRows)) + chk.SetNumVirtualRows(chkRows + j.nextCachedBuildRowIndex) } } @@ -479,14 +485,14 @@ func (j *baseJoinProbe) buildResultAfterOtherCondition(chk *chunk.Chunk, joinedC } } if hasRemainCols { - j.cachedBuildRows = j.cachedBuildRows[:0] + j.nextCachedBuildRowIndex = 0 // build column that is not in joinedChk for index, result := range j.selected { if result { - j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(j.rowIndexInfos[index], chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false) + j.appendBuildRowToCachedBuildRowsV2(&j.rowIndexInfos[index], chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false) } } - if len(j.cachedBuildRows) > 0 { + if j.nextCachedBuildRowIndex > 0 { j.batchConstructBuildRows(chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false) } } @@ -532,7 +538,8 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType, base.hasNullableKey = true } } - base.cachedBuildRows = make([]*matchedRowInfo, 0, batchBuildRowSize) + base.cachedBuildRows = make([]matchedRowInfo, batchBuildRowSize) + base.nextCachedBuildRowIndex = 0 base.matchedRowsHeaders = make([]taggedPtr, 0, chunk.InitialCapacity) base.matchedRowsHashValue = make([]uint64, 0, chunk.InitialCapacity) base.selRows = make([]int, 0, chunk.InitialCapacity) @@ -554,7 +561,7 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType, base.tmpChk = chunk.NewChunkWithCapacity(joinedColumnTypes, chunk.InitialCapacity) base.tmpChk.SetInCompleteChunk(true) base.selected = make([]bool, 0, chunk.InitialCapacity) - base.rowIndexInfos = make([]*matchedRowInfo, 0, chunk.InitialCapacity) + base.rowIndexInfos = make([]matchedRowInfo, 0, chunk.InitialCapacity) } switch joinType { case logicalop.InnerJoin: @@ -606,5 +613,7 @@ func newMockJoinProbe(ctx *HashJoinCtxV2) *mockJoinProbe { rUsedInOtherCondition: ctx.RUsedInOtherCondition, rightAsBuildSide: false, } + base.cachedBuildRows = make([]matchedRowInfo, batchBuildRowSize) + base.nextCachedBuildRowIndex = 0 return &mockJoinProbe{base} } diff --git a/pkg/executor/join/inner_join_probe.go b/pkg/executor/join/inner_join_probe.go index 2918c1f533342..1f8537e0c7271 100644 --- a/pkg/executor/join/inner_join_probe.go +++ b/pkg/executor/join/inner_join_probe.go @@ -48,7 +48,7 @@ func (j *innerJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlk candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow]) if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) { // key matched, convert row to column for build side - j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(j.currentProbeRow, candidateRow), joinedChk, 0, hasOtherCondition) + j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition) j.matchedRowsForCurrentProbeRow++ remainCap-- } else { diff --git a/pkg/executor/join/outer_join_probe.go b/pkg/executor/join/outer_join_probe.go index ac7a95b834e8f..ba4002ac59655 100644 --- a/pkg/executor/join/outer_join_probe.go +++ b/pkg/executor/join/outer_join_probe.go @@ -113,7 +113,7 @@ func (j *outerJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKille if j.rowIter == nil { panic("scanRowTable before init") } - j.cachedBuildRows = j.cachedBuildRows[:0] + j.nextCachedBuildRowIndex = 0 meta := j.ctx.hashTableMeta insertedRows := 0 remainCap := joinResult.chk.RequiredRows() - joinResult.chk.NumRows() @@ -121,7 +121,7 @@ func (j *outerJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKille currentRow := j.rowIter.getValue() if !meta.isCurrentRowUsed(currentRow) { // append build side of this row - j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, currentRow), joinResult.chk, 0, false) + j.appendBuildRowToCachedBuildRowsV1(0, currentRow, joinResult.chk, 0, false) insertedRows++ } j.rowIter.next() @@ -131,7 +131,7 @@ func (j *outerJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKille joinResult.err = err return joinResult } - if len(j.cachedBuildRows) > 0 { + if j.nextCachedBuildRowIndex > 0 { j.batchConstructBuildRows(joinResult.chk, 0, false) } // append probe side in batch @@ -176,17 +176,17 @@ func (j *outerJoinProbe) buildResultForMatchedRowsAfterOtherCondition(chk, joine } } if hasRemainCols { - j.cachedBuildRows = j.cachedBuildRows[:0] + j.nextCachedBuildRowIndex = 0 markedJoined = true meta := j.ctx.hashTableMeta for index, result := range j.selected { if result { rowIndexInfo := j.rowIndexInfos[index] j.isNotMatchedRows[rowIndexInfo.probeRowIndex] = false - j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(rowIndexInfo, chk, meta.columnCountNeededForOtherCondition, false) + j.appendBuildRowToCachedBuildRowsV2(&rowIndexInfo, chk, meta.columnCountNeededForOtherCondition, false) } } - if len(j.cachedBuildRows) > 0 { + if j.nextCachedBuildRowIndex > 0 { j.batchConstructBuildRows(chk, meta.columnCountNeededForOtherCondition, false) } } @@ -248,7 +248,7 @@ func (j *outerJoinProbe) probeForInnerSideBuild(chk, joinedChk *chunk.Chunk, rem candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow]) if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) { // join key match - j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(j.currentProbeRow, candidateRow), joinedChk, 0, hasOtherCondition) + j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition) if !hasOtherCondition { // has no other condition, key match mean join match j.isNotMatchedRows[j.currentProbeRow] = false @@ -304,7 +304,7 @@ func (j *outerJoinProbe) probeForOuterSideBuild(chk, joinedChk *chunk.Chunk, rem candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow]) if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) { // join key match - j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(j.currentProbeRow, candidateRow), joinedChk, 0, hasOtherCondition) + j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition) if !hasOtherCondition { // has no other condition, key match means join match meta.setUsedFlag(candidateRow) diff --git a/pkg/executor/join/row_table_builder_test.go b/pkg/executor/join/row_table_builder_test.go index 3d529318b6151..77b28eb34aa51 100644 --- a/pkg/executor/join/row_table_builder_test.go +++ b/pkg/executor/join/row_table_builder_test.go @@ -392,12 +392,12 @@ func checkColumns(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs rowStart := rowTables[0].getRowPointer(logicalIndex) require.NotEqual(t, unsafe.Pointer(nil), rowStart, "row start must not be nil, logical index = "+strconv.Itoa(logicalIndex)+", physical index = "+strconv.Itoa(physicalIndex)) if hasOtherConditionColumns { - mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), tmpChunk, 0, hasOtherConditionColumns) + mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, tmpChunk, 0, hasOtherConditionColumns) } else { - mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), resultChunk, 0, hasOtherConditionColumns) + mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, resultChunk, 0, hasOtherConditionColumns) } } - if len(mockJoinProber.cachedBuildRows) > 0 { + if mockJoinProber.nextCachedBuildRowIndex > 0 { if hasOtherConditionColumns { mockJoinProber.batchConstructBuildRows(tmpChunk, 0, hasOtherConditionColumns) } else { @@ -430,13 +430,13 @@ func checkColumns(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs rowStart := rowTables[0].getRowPointer(rowIndex) require.NotEqual(t, unsafe.Pointer(nil), rowStart, "row start must not be nil, logical index = "+strconv.Itoa(logicalIndex)+", physical index = "+strconv.Itoa(physicalIndex)) if hasOtherConditionColumns { - mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), tmpChunk, 0, hasOtherConditionColumns) + mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, tmpChunk, 0, hasOtherConditionColumns) } else { - mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), resultChunk, 0, hasOtherConditionColumns) + mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, resultChunk, 0, hasOtherConditionColumns) } rowIndex++ } - if len(mockJoinProber.cachedBuildRows) > 0 { + if mockJoinProber.nextCachedBuildRowIndex > 0 { if hasOtherConditionColumns { mockJoinProber.batchConstructBuildRows(tmpChunk, 0, hasOtherConditionColumns) } else {