Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <[email protected]>
  • Loading branch information
windtalker committed Sep 4, 2024
1 parent 8a06eb4 commit 6fe48b5
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 18 deletions.
12 changes: 3 additions & 9 deletions pkg/executor/join/base_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ type matchedRowInfo struct {
buildRowOffset int
}

func createMatchRowInfo(probeRowIndex int, buildRowStart unsafe.Pointer) *matchedRowInfo {
ret := &matchedRowInfo{probeRowIndex: probeRowIndex}
*(*unsafe.Pointer)(unsafe.Pointer(&ret.buildRowStart)) = buildRowStart
return ret
}

type posAndHashValue struct {
hashValue uint64
pos int
Expand Down Expand Up @@ -279,15 +273,15 @@ func checkSQLKiller(killer *sqlkiller.SQLKiller, fpName string) error {
return err
}

func (j *baseJoinProbe) appendBuildRowToCachedBuildRows(rowInfo *matchedRowInfo, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) {
func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsV2(rowInfo *matchedRowInfo, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) {
j.cachedBuildRows[j.nextCachedBuildRowIndex] = *rowInfo
j.nextCachedBuildRowIndex++
if j.nextCachedBuildRowIndex == batchBuildRowSize {
j.batchConstructBuildRows(chk, currentColumnIndexInRow, forOtherCondition)
}
}

func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(probeRowIndex int, buildRowStart unsafe.Pointer, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) {
func (j *baseJoinProbe) appendBuildRowToCachedBuildRowsV1(probeRowIndex int, buildRowStart unsafe.Pointer, chk *chunk.Chunk, currentColumnIndexInRow int, forOtherCondition bool) {
j.cachedBuildRows[j.nextCachedBuildRowIndex].probeRowIndex = probeRowIndex
j.cachedBuildRows[j.nextCachedBuildRowIndex].buildRowOffset = 0
*(*unsafe.Pointer)(unsafe.Pointer(&j.cachedBuildRows[j.nextCachedBuildRowIndex].buildRowStart)) = buildRowStart
Expand Down Expand Up @@ -497,7 +491,7 @@ func (j *baseJoinProbe) buildResultAfterOtherCondition(chk *chunk.Chunk, joinedC
// build column that is not in joinedChk
for index, result := range j.selected {
if result {
j.appendBuildRowToCachedBuildRows(&j.rowIndexInfos[index], chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false)
j.appendBuildRowToCachedBuildRowsV2(&j.rowIndexInfos[index], chk, j.ctx.hashTableMeta.columnCountNeededForOtherCondition, false)
}
}
if len(j.cachedBuildRows) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/join/inner_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (j *innerJoinProbe) Probe(joinResult *hashjoinWorkerResult, sqlKiller *sqlk
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
// key matched, convert row to column for build side
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
j.matchedRowsForCurrentProbeRow++
remainCap--
} else {
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/join/outer_join_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (j *outerJoinProbe) ScanRowTable(joinResult *hashjoinWorkerResult, sqlKille
currentRow := j.rowIter.getValue()
if !meta.isCurrentRowUsed(currentRow) {
// append build side of this row
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(0, currentRow, joinResult.chk, 0, false)
j.appendBuildRowToCachedBuildRowsV1(0, currentRow, joinResult.chk, 0, false)
insertedRows++
}
j.rowIter.next()
Expand Down Expand Up @@ -183,7 +183,7 @@ func (j *outerJoinProbe) buildResultForMatchedRowsAfterOtherCondition(chk, joine
if result {
rowIndexInfo := j.rowIndexInfos[index]
j.isNotMatchedRows[rowIndexInfo.probeRowIndex] = false
j.appendBuildRowToCachedBuildRows(&rowIndexInfo, chk, meta.columnCountNeededForOtherCondition, false)
j.appendBuildRowToCachedBuildRowsV2(&rowIndexInfo, chk, meta.columnCountNeededForOtherCondition, false)
}
}
if len(j.cachedBuildRows) > 0 {
Expand Down Expand Up @@ -248,7 +248,7 @@ func (j *outerJoinProbe) probeForInnerSideBuild(chk, joinedChk *chunk.Chunk, rem
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
// join key match
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
if !hasOtherCondition {
// has no other condition, key match mean join match
j.isNotMatchedRows[j.currentProbeRow] = false
Expand Down Expand Up @@ -304,7 +304,7 @@ func (j *outerJoinProbe) probeForOuterSideBuild(chk, joinedChk *chunk.Chunk, rem
candidateRow := tagHelper.toUnsafePointer(j.matchedRowsHeaders[j.currentProbeRow])
if isKeyMatched(meta.keyMode, j.serializedKeys[j.currentProbeRow], candidateRow, meta) {
// join key match
j.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
j.appendBuildRowToCachedBuildRowsV1(j.currentProbeRow, candidateRow, joinedChk, 0, hasOtherCondition)
if !hasOtherCondition {
// has no other condition, key match means join match
meta.setUsedFlag(candidateRow)
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/join/row_table_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,9 @@ func checkColumns(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs
rowStart := rowTables[0].getRowPointer(logicalIndex)
require.NotEqual(t, unsafe.Pointer(nil), rowStart, "row start must not be nil, logical index = "+strconv.Itoa(logicalIndex)+", physical index = "+strconv.Itoa(physicalIndex))
if hasOtherConditionColumns {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), tmpChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, tmpChunk, 0, hasOtherConditionColumns)
} else {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), resultChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, resultChunk, 0, hasOtherConditionColumns)
}
}
if len(mockJoinProber.cachedBuildRows) > 0 {
Expand Down Expand Up @@ -430,9 +430,9 @@ func checkColumns(t *testing.T, withSelCol bool, buildFilter expression.CNFExprs
rowStart := rowTables[0].getRowPointer(rowIndex)
require.NotEqual(t, unsafe.Pointer(nil), rowStart, "row start must not be nil, logical index = "+strconv.Itoa(logicalIndex)+", physical index = "+strconv.Itoa(physicalIndex))
if hasOtherConditionColumns {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), tmpChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, tmpChunk, 0, hasOtherConditionColumns)
} else {
mockJoinProber.appendBuildRowToCachedBuildRowsAndConstructBuildRowsIfNeeded(createMatchRowInfo(0, rowStart), resultChunk, 0, hasOtherConditionColumns)
mockJoinProber.appendBuildRowToCachedBuildRowsV1(0, rowStart, resultChunk, 0, hasOtherConditionColumns)
}
rowIndex++
}
Expand Down

0 comments on commit 6fe48b5

Please sign in to comment.