Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: avoid new small objects in probe stage of hash join v2 #55855

Merged
merged 5 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 37 additions & 28 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ type matchedRowInfo struct {
buildRowOffset int
}

func createMatchRowInfo(probeRowIndex int, buildRowStart unsafe.Pointer) *matchedRowInfo {
ret := &matchedRowInfo{probeRowIndex: probeRowIndex}
*(*unsafe.Pointer)(unsafe.Pointer(&ret.buildRowStart)) = buildRowStart
return ret
}

type posAndHashValue struct {
hashValue uint64
pos int
Expand All @@ -116,7 +110,8 @@ type baseJoinProbe struct {
currentProbeRow int
matchedRowsForCurrentProbeRow int
chunkRows int
cachedBuildRows []*matchedRowInfo
cachedBuildRows []matchedRowInfo
nextCachedBuildRowIndex int

keyIndex []int
keyTypes []*types.FieldType
Expand All @@ -133,7 +128,7 @@ type baseJoinProbe struct {
offsetAndLengthArray []offsetAndLength
// these 3 variables are used for join that has other condition, should be inited when the join has other condition
tmpChk *chunk.Chunk
rowIndexInfos []*matchedRowInfo
rowIndexInfos []matchedRowInfo
selected []bool

probeCollision uint64
Expand All @@ -152,7 +147,7 @@ func (j *baseJoinProbe) IsCurrentChunkProbeDone() bool {
}

func (j *baseJoinProbe) finishCurrentLookupLoop(joinedChk *chunk.Chunk) {
if len(j.cachedBuildRows) > 0 {
if j.nextCachedBuildRowIndex > 0 {
j.batchConstructBuildRows(joinedChk, 0, j.ctx.hasOtherCondition())
}
j.finishLookupCurrentProbeRow()
Expand Down Expand Up @@ -278,9 +273,20 @@ func checkSQLKiller(killer *sqlkiller.SQLKiller, fpName string) error {
return err
}

func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(buildRow *matchedRowInfo, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) {
j.cachedBuildRows = append(j.cachedBuildRows, buildRow)
if len(j.cachedBuildRows) >= batchBuildRowSize {
func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsV2(rowInfo *matchedRowInfo, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) {
j.cachedBuildRows[j.nextCachedBuildRowIndex] = *rowInfo
j.nextCachedBuildRowIndex++
if j.nextCachedBuildRowIndex == batchBuildRowSize {
j.batchConstructBuildRows(chk, currentColumnIndexInRow, forOtherCondition)
}
}

func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsV1(probeRowIndex int, buildRowStart unsafe.Pointer, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) {
j.cachedBuildRows[j.nextCachedBuildRowIndex].probeRowIndex = probeRowIndex
j.cachedBuildRows[j.nextCachedBuildRowIndex].buildRowOffset = 0
*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[j.nextCachedBuildRowIndex].buildRowStart)) = buildRowStart
j.nextCachedBuildRowIndex++
if j.nextCachedBuildRowIndex == batchBuildRowSize {
j.batchConstructBuildRows(chk, currentColumnIndexInRow, forOtherCondition)
}
}
Expand All @@ -290,12 +296,12 @@ func (j *baseJoinProbe) batchConstructBuildRows(chk *chunk.Chunk, currentColumnI
if forOtherCondition {
j.rowIndexInfos = append(j.rowIndexInfos, j.cachedBuildRows...)
}
j.cachedBuildRows = j.cachedBuildRows[:0]
j.nextCachedBuildRowIndex = 0
}

func (j *baseJoinProbe) prepareForProbe(chk *chunk.Chunk) (joinedChk *chunk.Chunk, remainCap int, err error) {
j.offsetAndLengthArray = j.offsetAndLengthArray[:0]
j.cachedBuildRows = j.cachedBuildRows[:0]
j.nextCachedBuildRowIndex = 0
j.matchedRowsForCurrentProbeRow = 0
joinedChk = chk
if j.ctx.OtherCondition != nil {
Expand Down Expand Up @@ -326,15 +332,15 @@ func (j *baseJoinProbe) appendBuildRowToChunk(chk *chunk.Chunk, currentColumnInd
func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols []int, forOtherCondition bool, colOffset int, currentColumnInRow int) {
chkRows := chk.NumRows()
needUpdateVirtualRow := currentColumnInRow == 0
if len(usedCols) == 0 || len(j.cachedBuildRows) == 0 {
if len(usedCols) == 0 || j.nextCachedBuildRowIndex == 0 {
if needUpdateVirtualRow {
chk.SetNumVirtualRows(chkRows + len(j.cachedBuildRows))
chk.SetNumVirtualRows(chkRows + j.nextCachedBuildRowIndex)
}
return
}
for i := 0; i < len(j.cachedBuildRows); i++ {
for i := 0; i < j.nextCachedBuildRowIndex; i++ {
if j.cachedBuildRows[i].buildRowOffset == 0 {
j.ctx.hashTableMeta.advanceToRowData(j.cachedBuildRows[i])
j.ctx.hashTableMeta.advanceToRowData(&j.cachedBuildRows[i])
}
}
colIndexMap := make(map[int]int)
Expand Down Expand Up @@ -366,32 +372,32 @@ func (j *baseJoinProbe) appendBuildRowToChunkInternal(chk *chunk.Chunk, usedCols
currentColumn = chk.Column(indexInDstChk)
readNullMapThreadSafe := meta.isReadNullMapThreadSafe(columnIndex)
if readNullMapThreadSafe {
for index := range j.cachedBuildRows {
for index := 0; index < j.nextCachedBuildRowIndex; index++ {
currentColumn.AppendNullBitmap(!meta.isColumnNull(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), columnIndex))
j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset)
}
} else {
for index := range j.cachedBuildRows {
for index := 0; index < j.nextCachedBuildRowIndex; index++ {
currentColumn.AppendNullBitmap(!meta.isColumnNullThreadSafe(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), columnIndex))
j.cachedBuildRows[index].buildRowOffset = chunk.AppendCellFromRawData(currentColumn, *(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset)
}
}
} else {
// not used so don't need to insert into chk, but still need to advance rowData
if meta.columnsSize[columnIndex] < 0 {
for index := range j.cachedBuildRows {
for index := 0; index < j.nextCachedBuildRowIndex; index++ {
size := *(*uint64)(unsafe.Add(*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[index].buildRowStart)), j.cachedBuildRows[index].buildRowOffset))
j.cachedBuildRows[index].buildRowOffset += sizeOfLengthField + int(size)
}
} else {
for index := range j.cachedBuildRows {
for index := 0; index < j.nextCachedBuildRowIndex; index++ {
j.cachedBuildRows[index].buildRowOffset += meta.columnsSize[columnIndex]
}
}
}
}
if needUpdateVirtualRow {
chk.SetNumVirtualRows(chkRows + len(j.cachedBuildRows))
chk.SetNumVirtualRows(chkRows + j.nextCachedBuildRowIndex)
}
}

Expand Down Expand Up @@ -479,14 +485,14 @@ func (j *baseJoinProbe) buildResultAfterOtherCondition(chk *chunk.Chunk, joinedC
}
}
if hasRemainCols {
j.cachedBuildRows = j.cachedBuildRows[:0]
j.nextCachedBuildRowIndex = 0
// build column that is not in joinedChk
for index, result := range j.selected {
if result {
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(j.rowIndexInfos[index], chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false)
j.appendBuildRowToCachedBuildRowsV2(&j.rowIndexInfos[index], chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false)
}
}
if len(j.cachedBuildRows) > 0 {
if j.nextCachedBuildRowIndex > 0 {
j.batchConstructBuildRows(chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false)
}
}
Expand Down Expand Up @@ -532,7 +538,8 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType,
base.hasNullableKey = true
}
}
base.cachedBuildRows = make([]*matchedRowInfo, 0, batchBuildRowSize)
base.cachedBuildRows = make([]matchedRowInfo, batchBuildRowSize)
base.nextCachedBuildRowIndex = 0
base.matchedRowsHeaders = make([]taggedPtr, 0, chunk.InitialCapacity)
base.matchedRowsHashValue = make([]uint64, 0, chunk.InitialCapacity)
base.selRows = make([]int, 0, chunk.InitialCapacity)
Expand All @@ -554,7 +561,7 @@ func NewJoinProbe(ctx *HashJoinCtxV2, workID uint, joinType logicalop.JoinType,
base.tmpChk = chunk.NewChunkWithCapacity(joinedColumnTypes, chunk.InitialCapacity)
base.tmpChk.SetInCompleteChunk(true)
base.selected = make([]bool, 0, chunk.InitialCapacity)
base.rowIndexInfos = make([]*matchedRowInfo, 0, chunk.InitialCapacity)
base.rowIndexInfos = make([]matchedRowInfo, 0, chunk.InitialCapacity)
}
switch joinType {
case logicalop.InnerJoin:
Expand Down Expand Up @@ -606,5 +613,7 @@ func newMockJoinProbe(ctx *HashJoinCtxV2) *mockJoinProbe {
rUsedInOtherCondition: ctx.RUsedInOtherCondition,
rightAsBuildSide: false,
}
base.cachedBuildRows = make([]matchedRowInfo, batchBuildRowSize)
base.nextCachedBuildRowIndex = 0
return &mockJoinProbe{base}
}
2 changes: 1 addition & 1 deletion pkg/executor/join/inner_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (j *innerJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlk
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
// key matched, convert row to column for build side
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(j.currentProbeRow, candidateRow), joinedChk, 0, hasOtherCondition)
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
j.matchedRowsForCurrentProbeRow++
remainCap--
} else {
Expand Down
16 changes: 8 additions & 8 deletions pkg/executor/join/outer_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,15 @@ func (j *outerJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKille
if j.rowIter == nil {
panic("scanRowTable before init")
}
j.cachedBuildRows = j.cachedBuildRows[:0]
j.nextCachedBuildRowIndex = 0
meta := j.ctx.hashTableMeta
insertedRows := 0
remainCap := joinResult.chk.RequiredRows() - joinResult.chk.NumRows()
for insertedRows < remainCap && !j.rowIter.isEnd() {
currentRow := j.rowIter.getValue()
if !meta.isCurrentRowUsed(currentRow) {
// append build side of this row
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, currentRow), joinResult.chk, 0, false)
j.appendBuildRowToCachedBuildRowsV1(0, currentRow, joinResult.chk, 0, false)
insertedRows++
}
j.rowIter.next()
Expand All @@ -131,7 +131,7 @@ func (j *outerJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKille
joinResult.err = err
return joinResult
}
if len(j.cachedBuildRows) > 0 {
if j.nextCachedBuildRowIndex > 0 {
j.batchConstructBuildRows(joinResult.chk, 0, false)
}
// append probe side in batch
Expand Down Expand Up @@ -176,17 +176,17 @@ func (j *outerJoinProbe) buildResultForMatchedRowsAfterOtherCondition(chk, joine
}
}
if hasRemainCols {
j.cachedBuildRows = j.cachedBuildRows[:0]
j.nextCachedBuildRowIndex = 0
markedJoined = true
meta := j.ctx.hashTableMeta
for index, result := range j.selected {
if result {
rowIndexInfo := j.rowIndexInfos[index]
j.isNotMatchedRows[rowIndexInfo.probeRowIndex] = false
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(rowIndexInfo, chk, meta.columnCountNeededForOtherCondition, false)
j.appendBuildRowToCachedBuildRowsV2(&rowIndexInfo, chk, meta.columnCountNeededForOtherCondition, false)
}
}
if len(j.cachedBuildRows) > 0 {
if j.nextCachedBuildRowIndex > 0 {
j.batchConstructBuildRows(chk, meta.columnCountNeededForOtherCondition, false)
}
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func (j *outerJoinProbe) probeForInnerSideBuild(chk, joinedChk *chunk.Chunk, rem
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
// join key match
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(j.currentProbeRow, candidateRow), joinedChk, 0, hasOtherCondition)
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
if !hasOtherCondition {
// has no other condition, key match mean join match
j.isNotMatchedRows[j.currentProbeRow] = false
Expand Down Expand Up @@ -304,7 +304,7 @@ func (j *outerJoinProbe) probeForOuterSideBuild(chk, joinedChk *chunk.Chunk, rem
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
// join key match
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(j.currentProbeRow, candidateRow), joinedChk, 0, hasOtherCondition)
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
if !hasOtherCondition {
// has no other condition, key match means join match
meta.setUsedFlag(candidateRow)
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/join/row_table_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,12 +392,12 @@ func checkColumns(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs
rowStart := rowTables[0].getRowPointer(logicalIndex)
require.NotEqual(t, unsafe.Pointer(nil), rowStart, "row start must not be nil, logical index = "+strconv.Itoa(logicalIndex)+", physical index = "+strconv.Itoa(physicalIndex))
if hasOtherConditionColumns {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), tmpChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, tmpChunk, 0, hasOtherConditionColumns)
} else {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), resultChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, resultChunk, 0, hasOtherConditionColumns)
}
}
if len(mockJoinProber.cachedBuildRows) > 0 {
if mockJoinProber.nextCachedBuildRowIndex > 0 {
if hasOtherConditionColumns {
mockJoinProber.batchConstructBuildRows(tmpChunk, 0, hasOtherConditionColumns)
} else {
Expand Down Expand Up @@ -430,13 +430,13 @@ func checkColumns(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs
rowStart := rowTables[0].getRowPointer(rowIndex)
require.NotEqual(t, unsafe.Pointer(nil), rowStart, "row start must not be nil, logical index = "+strconv.Itoa(logicalIndex)+", physical index = "+strconv.Itoa(physicalIndex))
if hasOtherConditionColumns {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), tmpChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, tmpChunk, 0, hasOtherConditionColumns)
} else {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), resultChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, resultChunk, 0, hasOtherConditionColumns)
}
rowIndex++
}
if len(mockJoinProber.cachedBuildRows) > 0 {
if mockJoinProber.nextCachedBuildRowIndex > 0 {
if hasOtherConditionColumns {
mockJoinProber.batchConstructBuildRows(tmpChunk, 0, hasOtherConditionColumns)
} else {
Expand Down