diff --git a/pkg/executor/join/BUILD.bazel b/pkg/executor/join/BUILD.bazel index 684bef521d4ae..45d71fab745b6 100644 --- a/pkg/executor/join/BUILD.bazel +++ b/pkg/executor/join/BUILD.bazel @@ -15,9 +15,11 @@ go_library( "index_lookup_merge_join.go", "inner_join_probe.go", "join_row_table.go", + "join_table_meta.go", "joiner.go", "merge_join.go", "outer_join_probe.go", + "row_table_builder.go", "tagged_ptr.go", ], importpath = "github.com/pingcap/tidb/pkg/executor/join", @@ -73,6 +75,7 @@ go_test( "inner_join_probe_test.go", "join_row_table_test.go", "join_stats_test.go", + "join_table_meta_test.go", "joiner_test.go", "left_outer_join_probe_test.go", "merge_join_test.go", diff --git a/pkg/executor/join/base_join_probe.go b/pkg/executor/join/base_join_probe.go index 8b57a19826390..a87321c39676d 100644 --- a/pkg/executor/join/base_join_probe.go +++ b/pkg/executor/join/base_join_probe.go @@ -506,7 +506,7 @@ func (j *baseJoinProbe) buildResultAfterOtherCondition(chk *chunk.Chunk, joinedC return } -func isKeyMatched(keyMode keyMode, serializedKey []byte, rowStart unsafe.Pointer, meta *TableMeta) bool { +func isKeyMatched(keyMode keyMode, serializedKey []byte, rowStart unsafe.Pointer, meta *joinTableMeta) bool { switch keyMode { case OneInt64: return *(*int64)(unsafe.Pointer(&serializedKey[0])) == *(*int64)(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr)) diff --git a/pkg/executor/join/hash_join_v2.go b/pkg/executor/join/hash_join_v2.go index 7a39e68428b9d..c468c9434ae68 100644 --- a/pkg/executor/join/hash_join_v2.go +++ b/pkg/executor/join/hash_join_v2.go @@ -85,7 +85,7 @@ func (htc *hashTableContext) lookup(partitionIndex int, hashValue uint64) tagged return htc.hashTable.tables[partitionIndex].lookup(hashValue, htc.tagHelper) } -func (htc *hashTableContext) getCurrentRowSegment(workerID, partitionID int, tableMeta *TableMeta, allowCreate bool, firstSegSizeHint uint) *rowTableSegment { +func (htc *hashTableContext) getCurrentRowSegment(workerID, partitionID int, tableMeta *joinTableMeta, allowCreate bool, firstSegSizeHint uint) *rowTableSegment { if htc.rowTables[workerID][partitionID] == nil { htc.rowTables[workerID][partitionID] = newRowTable(tableMeta) } @@ -115,7 +115,7 @@ func (htc *hashTableContext) finalizeCurrentSeg(workerID, partitionID int, build htc.memoryTracker.Consume(seg.totalUsedBytes()) } -func (htc *hashTableContext) mergeRowTablesToHashTable(tableMeta *TableMeta, partitionNumber uint) int { +func (htc *hashTableContext) mergeRowTablesToHashTable(tableMeta *joinTableMeta, partitionNumber uint) int { rowTables := make([]*rowTable, partitionNumber) for i := 0; i < int(partitionNumber); i++ { rowTables[i] = newRowTable(tableMeta) @@ -157,7 +157,7 @@ type HashJoinCtxV2 struct { ProbeFilter expression.CNFExprs OtherCondition expression.CNFExprs hashTableContext *hashTableContext - hashTableMeta *TableMeta + hashTableMeta *joinTableMeta needScanRowTableAfterProbeDone bool LUsed, RUsed []int diff --git a/pkg/executor/join/join_row_table.go b/pkg/executor/join/join_row_table.go index 2cdd9ff8caa2a..60c149fb2c56e 100644 --- a/pkg/executor/join/join_row_table.go +++ b/pkg/executor/join/join_row_table.go @@ -15,17 +15,8 @@ package join import ( - "hash/fnv" - "sync/atomic" "unsafe" - "github.com/pingcap/tidb/pkg/expression" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/chunk" - "github.com/pingcap/tidb/pkg/util/codec" - "github.com/pingcap/tidb/pkg/util/collate" - "github.com/pingcap/tidb/pkg/util/hack" "github.com/pingcap/tidb/pkg/util/serialization" ) @@ -175,103 +166,8 @@ func getNextRowAddress(rowStart unsafe.Pointer, tagHelper *tagPtrHelper, hashVal return ret } -// TableMeta is the join table meta used in hash join v2 -type TableMeta struct { - // if the row has fixed length - isFixedLength bool - // the row length if the row is fixed length - rowLength int - // if the join keys has fixed length - isJoinKeysFixedLength bool - // the join keys length if it is fixed length - joinKeysLength int - // is the join key inlined in the row data, the join key can be inlined if and only if - // 1. keyProb.canBeInlined returns true for all the keys - // 2. there is no duplicate join keys - isJoinKeysInlined bool - // the length of null map, the null map include null bit for each column in the row and the used flag for right semi/outer join - nullMapLength int - // the column order in row layout, as described above, the save column order maybe different from the column order in build schema - // for example, the build schema maybe [col1, col2, col3], and the column order in row maybe [col2, col1, col3], then this array - // is [1, 0, 2] - rowColumnsOrder []int - // the column size of each column, -1 mean variable column, the order is the same as rowColumnsOrder - columnsSize []int - // the serialize mode for each key - serializeModes []codec.SerializeMode - // the first n columns in row is used for other condition, if a join has other condition, we only need to extract - // first n columns from the RowTable to evaluate other condition - columnCountNeededForOtherCondition int - // total column numbers for build side chunk, this is used to construct the chunk if there is join other condition - totalColumnNumber int - // column index offset in null map, will be 1 when if there is usedFlag and 0 if there is no usedFlag - colOffsetInNullMap int - // keyMode is the key mode, it can be OneInt/FixedSerializedKey/VariableSerializedKey - keyMode keyMode - // offset to rowData, -1 for variable length, non-inlined key - rowDataOffset int - // fakeKeyByte is used as the fake key when current join need keep invalid key rows - fakeKeyByte []byte -} - -func (meta *TableMeta) getSerializedKeyLength(rowStart unsafe.Pointer) uint64 { - return *(*uint64)(unsafe.Add(rowStart, sizeOfNextPtr+meta.nullMapLength)) -} - -func (meta *TableMeta) isReadNullMapThreadSafe(columnIndex int) bool { - // Other goroutine will use `atomic.StoreUint32` to write to the first 32 bit in nullmap when it need to set usedFlag - // so read from nullMap may meet concurrent write if meta.colOffsetInNullMap == 1 && (columnIndex + meta.colOffsetInNullMap < 32) - mayConcurrentWrite := meta.colOffsetInNullMap == 1 && columnIndex < 31 - return !mayConcurrentWrite -} - -// used in tests -func (meta *TableMeta) getKeyBytes(rowStart unsafe.Pointer) []byte { - switch meta.keyMode { - case OneInt64: - return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr), int(serialization.Uint64Len)) - case FixedSerializedKey: - return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr), meta.joinKeysLength) - case VariableSerializedKey: - return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr+sizeOfLengthField), int(meta.getSerializedKeyLength(rowStart))) - default: - panic("unknown key match type") - } -} - -func (meta *TableMeta) advanceToRowData(matchedRowInfo *matchedRowInfo) { - if meta.rowDataOffset == -1 { - // variable length, non-inlined key - matchedRowInfo.buildRowOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfLengthField + int(meta.getSerializedKeyLength(*(*unsafe.Pointer)(unsafe.Pointer(&matchedRowInfo.buildRowStart)))) - } else { - matchedRowInfo.buildRowOffset = meta.rowDataOffset - } -} - -func (meta *TableMeta) isColumnNull(rowStart unsafe.Pointer, columnIndex int) bool { - byteIndex := (columnIndex + meta.colOffsetInNullMap) / 8 - bitIndex := (columnIndex + meta.colOffsetInNullMap) % 8 - return *(*uint8)(unsafe.Add(rowStart, sizeOfNextPtr+byteIndex))&(uint8(1)<<(7-bitIndex)) != uint8(0) -} - -// for join that need to set UsedFlag during probe stage, read from nullMap is not thread safe for the first 32 bit of nullMap, atomic.LoadUint32 is used to avoid read-write conflict -func (*TableMeta) isColumnNullThreadSafe(rowStart unsafe.Pointer, columnIndex int) bool { - return atomic.LoadUint32((*uint32)(unsafe.Add(rowStart, sizeOfNextPtr)))&bitMaskInUint32[columnIndex+1] != uint32(0) -} - -func (*TableMeta) setUsedFlag(rowStart unsafe.Pointer) { - addr := (*uint32)(unsafe.Add(rowStart, sizeOfNextPtr)) - value := atomic.LoadUint32(addr) - value |= usedFlagMask - atomic.StoreUint32(addr, value) -} - -func (*TableMeta) isCurrentRowUsed(rowStart unsafe.Pointer) bool { - return (*(*uint32)(unsafe.Add(rowStart, sizeOfNextPtr)) & usedFlagMask) == usedFlagMask -} - type rowTable struct { - meta *TableMeta + meta *joinTableMeta segments []*rowTableSegment } @@ -298,510 +194,13 @@ func (rt *rowTable) getValidJoinKeyPos(rowIndex int) int { return -1 } -type keyProp struct { - canBeInlined bool - keyLength int - isKeyInteger bool - isKeyUnsigned bool -} - -func getKeyProp(tp *types.FieldType) *keyProp { - switch tp.GetType() { - case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeYear, - mysql.TypeDuration: - isKeyUnsigned := mysql.HasUnsignedFlag(tp.GetFlag()) - if tp.GetType() == mysql.TypeYear { - // year type is always unsigned - isKeyUnsigned = true - } else if tp.GetType() == mysql.TypeDuration { - // duration type is always signed - isKeyUnsigned = false - } - return &keyProp{canBeInlined: true, keyLength: chunk.GetFixedLen(tp), isKeyInteger: true, isKeyUnsigned: isKeyUnsigned} - case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob: - collator := collate.GetCollator(tp.GetCollate()) - return &keyProp{canBeInlined: collate.CanUseRawMemAsKey(collator), keyLength: chunk.VarElemLen, isKeyInteger: false, isKeyUnsigned: false} - case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp: - // date related type will use uint64 as serialized key - return &keyProp{canBeInlined: false, keyLength: int(serialization.Uint64Len), isKeyInteger: true, isKeyUnsigned: true} - case mysql.TypeFloat: - // float will use float64 as serialized key - return &keyProp{canBeInlined: false, keyLength: int(serialization.Float64Len), isKeyInteger: false, isKeyUnsigned: false} - case mysql.TypeNewDecimal: - // Although decimal is fixed length, but its key is not fixed length - return &keyProp{canBeInlined: false, keyLength: chunk.VarElemLen, isKeyInteger: false, isKeyUnsigned: false} - case mysql.TypeEnum: - if mysql.HasEnumSetAsIntFlag(tp.GetFlag()) { - // enum int type is always unsigned - return &keyProp{canBeInlined: false, keyLength: int(serialization.Uint64Len), isKeyInteger: true, isKeyUnsigned: true} - } - return &keyProp{canBeInlined: false, keyLength: chunk.VarElemLen, isKeyInteger: false, isKeyUnsigned: false} - case mysql.TypeBit: - // bit type is always unsigned - return &keyProp{canBeInlined: false, keyLength: int(serialization.Uint64Len), isKeyInteger: true, isKeyUnsigned: true} - default: - keyLength := chunk.GetFixedLen(tp) - return &keyProp{canBeInlined: false, keyLength: keyLength, isKeyInteger: false, isKeyUnsigned: false} - } -} - -// buildKeyIndex is the build key column index based on buildSchema, should not be nil -// otherConditionColIndex is the column index that will be used in other condition, if no other condition, will be nil -// columnsNeedConvertToRow is the column index that need to be converted to row, should not be nil -// needUsedFlag is true for outer/semi join that use outer to build -func newTableMeta(buildKeyIndex []int, buildTypes, buildKeyTypes, probeKeyTypes []*types.FieldType, columnsUsedByOtherCondition []int, outputColumns []int, needUsedFlag bool) *TableMeta { - meta := &TableMeta{} - meta.isFixedLength = true - meta.rowLength = 0 - meta.totalColumnNumber = len(buildTypes) - - columnsNeedToBeSaved := make(map[int]struct{}, len(buildTypes)) - updateMeta := func(index int) { - if _, ok := columnsNeedToBeSaved[index]; !ok { - columnsNeedToBeSaved[index] = struct{}{} - length := chunk.GetFixedLen(buildTypes[index]) - if length == chunk.VarElemLen { - meta.isFixedLength = false - } else { - meta.rowLength += length - } - } - } - if outputColumns == nil { - // outputColumns = nil means all the column is needed - for index := range buildTypes { - updateMeta(index) - } - } else { - for _, index := range outputColumns { - updateMeta(index) - } - for _, index := range columnsUsedByOtherCondition { - updateMeta(index) - } - } - - meta.isJoinKeysFixedLength = true - meta.joinKeysLength = 0 - meta.isJoinKeysInlined = true - keyIndexMap := make(map[int]struct{}) - meta.serializeModes = make([]codec.SerializeMode, 0, len(buildKeyIndex)) - isAllKeyInteger := true - hasFixedSizeKeyColumn := false - varLengthKeyNumber := 0 - for index, keyIndex := range buildKeyIndex { - keyType := buildKeyTypes[index] - prop := getKeyProp(keyType) - if prop.keyLength != chunk.VarElemLen { - meta.joinKeysLength += prop.keyLength - hasFixedSizeKeyColumn = true - } else { - meta.isJoinKeysFixedLength = false - varLengthKeyNumber++ - } - if !prop.canBeInlined { - meta.isJoinKeysInlined = false - } - if prop.isKeyInteger { - buildUnsigned := prop.isKeyUnsigned - probeKeyProp := getKeyProp(probeKeyTypes[index]) - if !probeKeyProp.isKeyInteger { - panic("build key is integer but probe key is not integer, should not happens") - } - probeUnsigned := probeKeyProp.isKeyUnsigned - if (buildUnsigned && !probeUnsigned) || (probeUnsigned && !buildUnsigned) { - meta.serializeModes = append(meta.serializeModes, codec.NeedSignFlag) - meta.isJoinKeysInlined = false - if meta.isJoinKeysFixedLength { - // an extra sign flag is needed in this case - meta.joinKeysLength++ - } - } else { - meta.serializeModes = append(meta.serializeModes, codec.Normal) - } - } else { - if !prop.isKeyInteger { - isAllKeyInteger = false - } - if prop.keyLength == chunk.VarElemLen { - // keep var column by default for var length column - meta.serializeModes = append(meta.serializeModes, codec.KeepVarColumnLength) - } else { - meta.serializeModes = append(meta.serializeModes, codec.Normal) - } - } - keyIndexMap[keyIndex] = struct{}{} - } - if !meta.isJoinKeysFixedLength { - meta.joinKeysLength = -1 - } - if len(buildKeyIndex) != len(keyIndexMap) { - // has duplicated key, can not be inlined - meta.isJoinKeysInlined = false - } - if !meta.isJoinKeysInlined { - if varLengthKeyNumber == 1 { - // if key is not inlined and there is only one var-length key, then don't need to record the var length - for i := 0; i < len(buildKeyIndex); i++ { - if meta.serializeModes[i] == codec.KeepVarColumnLength { - meta.serializeModes[i] = codec.Normal - } - } - } - } else { - for _, index := range buildKeyIndex { - updateMeta(index) - } - } - if !meta.isFixedLength { - meta.rowLength = 0 - } - // construct the column order - meta.rowColumnsOrder = make([]int, 0, len(columnsNeedToBeSaved)) - meta.columnsSize = make([]int, 0, len(columnsNeedToBeSaved)) - usedColumnMap := make(map[int]struct{}, len(columnsNeedToBeSaved)) - - updateColumnOrder := func(index int) { - if _, ok := usedColumnMap[index]; !ok { - meta.rowColumnsOrder = append(meta.rowColumnsOrder, index) - meta.columnsSize = append(meta.columnsSize, chunk.GetFixedLen(buildTypes[index])) - usedColumnMap[index] = struct{}{} - } - } - if meta.isJoinKeysInlined { - // if join key is inlined, the join key will be the first columns - for _, index := range buildKeyIndex { - updateColumnOrder(index) - } - } - meta.columnCountNeededForOtherCondition = 0 - if len(columnsUsedByOtherCondition) > 0 { - // if join has other condition, the columns used by other condition is appended to row layout after the key - for _, index := range columnsUsedByOtherCondition { - updateColumnOrder(index) - } - meta.columnCountNeededForOtherCondition = len(usedColumnMap) - } - if outputColumns == nil { - // outputColumns = nil means all the column is needed - for index := range buildTypes { - updateColumnOrder(index) - } - } else { - for _, index := range outputColumns { - updateColumnOrder(index) - } - } - if isAllKeyInteger && len(buildKeyIndex) == 1 && meta.serializeModes[0] != codec.NeedSignFlag { - meta.keyMode = OneInt64 - } else { - if meta.isJoinKeysFixedLength { - meta.keyMode = FixedSerializedKey - } else { - meta.keyMode = VariableSerializedKey - } - } - if needUsedFlag { - meta.colOffsetInNullMap = 1 - // the total row length should be larger than 4 byte since the smallest unit of atomic.LoadXXX is UInt32 - if len(columnsNeedToBeSaved) > 0 { - // the smallest length of a column is 4 byte, so the total row length is enough - meta.nullMapLength = (len(columnsNeedToBeSaved) + 1 + 7) / 8 - } else { - // if no columns need to be converted to row format, then the key is not inlined - // 1. if any of the key columns is fixed length, then the row length is larger than 4 bytes(since the smallest length of a fixed length column is 4 bytes) - // 2. if all the key columns are variable length, there is no guarantee that the row length is larger than 4 byte, the nullmap should be 4 bytes alignment - if hasFixedSizeKeyColumn { - meta.nullMapLength = (len(columnsNeedToBeSaved) + 1 + 7) / 8 - } else { - meta.nullMapLength = ((len(columnsNeedToBeSaved) + 1 + 31) / 32) * 4 - } - } - } else { - meta.colOffsetInNullMap = 0 - meta.nullMapLength = (len(columnsNeedToBeSaved) + 7) / 8 - } - meta.rowDataOffset = -1 - if meta.isJoinKeysInlined { - if meta.isJoinKeysFixedLength { - meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength - } else { - meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfLengthField - } - } else { - if meta.isJoinKeysFixedLength { - meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength + meta.joinKeysLength - } - } - if meta.isJoinKeysFixedLength && !meta.isJoinKeysInlined { - meta.fakeKeyByte = make([]byte, meta.joinKeysLength) - } - return meta -} - -type rowTableBuilder struct { - buildKeyIndex []int - buildKeyTypes []*types.FieldType - hasNullableKey bool - hasFilter bool - keepFilteredRows bool - - serializedKeyVectorBuffer [][]byte - partIdxVector []int - selRows []int - usedRows []int - hashValue []uint64 - firstSegRowSizeHint uint - // filterVector and nullKeyVector is indexed by physical row index because the return vector of VectorizedFilter is based on physical row index - filterVector []bool // if there is filter before probe, filterVector saves the filter result - nullKeyVector []bool // nullKeyVector[i] = true if any of the key is null - - rowNumberInCurrentRowTableSeg []int64 -} - -func createRowTableBuilder(buildKeyIndex []int, buildKeyTypes []*types.FieldType, partitionNumber uint, hasNullableKey bool, hasFilter bool, keepFilteredRows bool) *rowTableBuilder { - builder := &rowTableBuilder{ - buildKeyIndex: buildKeyIndex, - buildKeyTypes: buildKeyTypes, - rowNumberInCurrentRowTableSeg: make([]int64, partitionNumber), - hasNullableKey: hasNullableKey, - hasFilter: hasFilter, - keepFilteredRows: keepFilteredRows, - } - return builder -} - -func (b *rowTableBuilder) initHashValueAndPartIndexForOneChunk(partitionMaskOffset int, partitionNumber uint) { - h := fnv.New64() - fakePartIndex := uint64(0) - for logicalRowIndex, physicalRowIndex := range b.usedRows { - if (b.filterVector != nil && !b.filterVector[physicalRowIndex]) || (b.nullKeyVector != nil && b.nullKeyVector[physicalRowIndex]) { - b.hashValue[logicalRowIndex] = fakePartIndex - b.partIdxVector[logicalRowIndex] = int(fakePartIndex) - fakePartIndex = (fakePartIndex + 1) % uint64(partitionNumber) - continue - } - h.Write(b.serializedKeyVectorBuffer[logicalRowIndex]) - hash := h.Sum64() - b.hashValue[logicalRowIndex] = hash - b.partIdxVector[logicalRowIndex] = int(hash >> partitionMaskOffset) - h.Reset() - } -} - -func (b *rowTableBuilder) processOneChunk(chk *chunk.Chunk, typeCtx types.Context, hashJoinCtx *HashJoinCtxV2, workerID int) error { - b.ResetBuffer(chk) - b.firstSegRowSizeHint = max(uint(1), uint(float64(len(b.usedRows))/float64(hashJoinCtx.partitionNumber)*float64(1.2))) - var err error - if b.hasFilter { - b.filterVector, err = expression.VectorizedFilter(hashJoinCtx.SessCtx.GetExprCtx().GetEvalCtx(), hashJoinCtx.SessCtx.GetSessionVars().EnableVectorizedExpression, hashJoinCtx.BuildFilter, chunk.NewIterator4Chunk(chk), b.filterVector) - if err != nil { - return err - } - } - err = checkSQLKiller(&hashJoinCtx.SessCtx.GetSessionVars().SQLKiller, "killedDuringBuild") - if err != nil { - return err - } - // 1. split partition - for index, colIdx := range b.buildKeyIndex { - err := codec.SerializeKeys(typeCtx, chk, b.buildKeyTypes[index], colIdx, b.usedRows, b.filterVector, b.nullKeyVector, hashJoinCtx.hashTableMeta.serializeModes[index], b.serializedKeyVectorBuffer) - if err != nil { - return err - } - } - err = checkSQLKiller(&hashJoinCtx.SessCtx.GetSessionVars().SQLKiller, "killedDuringBuild") - if err != nil { - return err - } - - b.initHashValueAndPartIndexForOneChunk(hashJoinCtx.partitionMaskOffset, hashJoinCtx.partitionNumber) - - // 2. build rowtable - return b.appendToRowTable(chk, hashJoinCtx, workerID) -} - -func resizeSlice[T int | uint64 | bool](s []T, newSize int) []T { - if cap(s) >= newSize { - s = s[:newSize] - } else { - s = make([]T, newSize) - } - return s -} - -func (b *rowTableBuilder) ResetBuffer(chk *chunk.Chunk) { - b.usedRows = chk.Sel() - logicalRows := chk.NumRows() - physicalRows := chk.Column(0).Rows() - - if b.usedRows == nil { - b.selRows = resizeSlice(b.selRows, logicalRows) - for i := 0; i < logicalRows; i++ { - b.selRows[i] = i - } - b.usedRows = b.selRows - } - b.partIdxVector = resizeSlice(b.partIdxVector, logicalRows) - b.hashValue = resizeSlice(b.hashValue, logicalRows) - if b.hasFilter { - b.filterVector = resizeSlice(b.filterVector, physicalRows) - } - if b.hasNullableKey { - b.nullKeyVector = resizeSlice(b.nullKeyVector, physicalRows) - for i := 0; i < physicalRows; i++ { - b.nullKeyVector[i] = false - } - } - if cap(b.serializedKeyVectorBuffer) >= logicalRows { - b.serializedKeyVectorBuffer = b.serializedKeyVectorBuffer[:logicalRows] - for i := 0; i < logicalRows; i++ { - b.serializedKeyVectorBuffer[i] = b.serializedKeyVectorBuffer[i][:0] - } - } else { - b.serializedKeyVectorBuffer = make([][]byte, logicalRows) - } -} - -func newRowTable(meta *TableMeta) *rowTable { +func newRowTable(meta *joinTableMeta) *rowTable { return &rowTable{ meta: meta, segments: make([]*rowTableSegment, 0), } } -func (b *rowTableBuilder) appendRemainingRowLocations(workerID int, htCtx *hashTableContext) { - for partID := 0; partID < int(htCtx.hashTable.partitionNumber); partID++ { - if b.rowNumberInCurrentRowTableSeg[partID] > 0 { - htCtx.finalizeCurrentSeg(workerID, partID, b) - } - } -} - -func fillNullMap(rowTableMeta *TableMeta, row *chunk.Row, seg *rowTableSegment) int { - if nullMapLength := rowTableMeta.nullMapLength; nullMapLength > 0 { - bitmap := make([]byte, nullMapLength) - for colIndexInRowTable, colIndexInRow := range rowTableMeta.rowColumnsOrder { - colIndexInBitMap := colIndexInRowTable + rowTableMeta.colOffsetInNullMap - if row.IsNull(colIndexInRow) { - bitmap[colIndexInBitMap/8] |= 1 << (7 - colIndexInBitMap%8) - } - } - seg.rawData = append(seg.rawData, bitmap...) - return nullMapLength - } - return 0 -} - -func fillNextRowPtr(seg *rowTableSegment) int { - seg.rawData = append(seg.rawData, fakeAddrPlaceHolder...) - return sizeOfNextPtr -} - -func (b *rowTableBuilder) fillSerializedKeyAndKeyLengthIfNeeded(rowTableMeta *TableMeta, hasValidKey bool, logicalRowIndex int, seg *rowTableSegment) int { - appendRowLength := 0 - // 1. fill key length if needed - if !rowTableMeta.isJoinKeysFixedLength { - // if join_key is not fixed length: `key_length` need to be written in rawData - // even the join keys is inlined, for example if join key is 2 binary string - // then the inlined join key should be: col1_size + col1_data + col2_size + col2_data - // and len(col1_size + col1_data + col2_size + col2_data) need to be written before the inlined join key - length := uint64(0) - if hasValidKey { - length = uint64(len(b.serializedKeyVectorBuffer[logicalRowIndex])) - } else { - length = 0 - } - seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfLengthField)...) - appendRowLength += sizeOfLengthField - } - // 2. fill serialized key if needed - if !rowTableMeta.isJoinKeysInlined { - // if join_key is not inlined: `serialized_key` need to be written in rawData - if hasValidKey { - seg.rawData = append(seg.rawData, b.serializedKeyVectorBuffer[logicalRowIndex]...) - appendRowLength += len(b.serializedKeyVectorBuffer[logicalRowIndex]) - } else { - // if there is no valid key, and the key is fixed length, then write a fake key - if rowTableMeta.isJoinKeysFixedLength { - seg.rawData = append(seg.rawData, rowTableMeta.fakeKeyByte...) - appendRowLength += rowTableMeta.joinKeysLength - } - // otherwise don't need to write since length is 0 - } - } - return appendRowLength -} - -func fillRowData(rowTableMeta *TableMeta, row *chunk.Row, seg *rowTableSegment) int { - appendRowLength := 0 - for index, colIdx := range rowTableMeta.rowColumnsOrder { - if rowTableMeta.columnsSize[index] > 0 { - // fixed size - seg.rawData = append(seg.rawData, row.GetRaw(colIdx)...) - appendRowLength += rowTableMeta.columnsSize[index] - } else { - // length, raw_data - raw := row.GetRaw(colIdx) - length := uint64(len(raw)) - seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfLengthField)...) - appendRowLength += sizeOfLengthField - seg.rawData = append(seg.rawData, raw...) - appendRowLength += int(length) - } - } - return appendRowLength -} - -func (b *rowTableBuilder) appendToRowTable(chk *chunk.Chunk, hashJoinCtx *HashJoinCtxV2, workerID int) error { - rowTableMeta := hashJoinCtx.hashTableMeta - for logicalRowIndex, physicalRowIndex := range b.usedRows { - if logicalRowIndex%10 == 0 || logicalRowIndex == len(b.usedRows)-1 { - err := checkSQLKiller(&hashJoinCtx.SessCtx.GetSessionVars().SQLKiller, "killedDuringBuild") - if err != nil { - return err - } - } - hasValidKey := (!b.hasFilter || b.filterVector[physicalRowIndex]) && (!b.hasNullableKey || !b.nullKeyVector[physicalRowIndex]) - if !hasValidKey && !b.keepFilteredRows { - continue - } - // need append the row to rowTable - var ( - row = chk.GetRow(logicalRowIndex) - partIdx = b.partIdxVector[logicalRowIndex] - seg *rowTableSegment - ) - seg = hashJoinCtx.hashTableContext.getCurrentRowSegment(workerID, partIdx, hashJoinCtx.hashTableMeta, true, b.firstSegRowSizeHint) - // first check if current seg is full - if b.rowNumberInCurrentRowTableSeg[partIdx] >= maxRowTableSegmentSize || len(seg.rawData) >= maxRowTableSegmentByteSize { - // finalize current seg and create a new seg - hashJoinCtx.hashTableContext.finalizeCurrentSeg(workerID, partIdx, b) - seg = hashJoinCtx.hashTableContext.getCurrentRowSegment(workerID, partIdx, hashJoinCtx.hashTableMeta, true, b.firstSegRowSizeHint) - } - if hasValidKey { - seg.validJoinKeyPos = append(seg.validJoinKeyPos, len(seg.hashValues)) - } - seg.hashValues = append(seg.hashValues, b.hashValue[logicalRowIndex]) - seg.rowStartOffset = append(seg.rowStartOffset, uint64(len(seg.rawData))) - rowLength := 0 - // fill next_row_ptr field - rowLength += fillNextRowPtr(seg) - // fill null_map - rowLength += fillNullMap(rowTableMeta, &row, seg) - // fill serialized key and key length if needed - rowLength += b.fillSerializedKeyAndKeyLengthIfNeeded(rowTableMeta, hasValidKey, logicalRowIndex, seg) - // fill row data - rowLength += fillRowData(rowTableMeta, &row, seg) - // to make sure rowLength is 8 bit alignment - if rowLength%8 != 0 { - seg.rawData = append(seg.rawData, fakeAddrPlaceHolder[:8-rowLength%8]...) - } - b.rowNumberInCurrentRowTableSeg[partIdx]++ - } - return nil -} - func (rt *rowTable) merge(other *rowTable) { rt.segments = append(rt.segments, other.segments...) } diff --git a/pkg/executor/join/join_row_table_test.go b/pkg/executor/join/join_row_table_test.go index 721f572e939b6..2b11d9af3053e 100644 --- a/pkg/executor/join/join_row_table_test.go +++ b/pkg/executor/join/join_row_table_test.go @@ -15,14 +15,10 @@ package join import ( - "strconv" "sync/atomic" "testing" "unsafe" - "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/codec" "github.com/stretchr/testify/require" ) @@ -48,294 +44,4 @@ func TestBitMaskInUint32(t *testing.T) { func TestUintptrCanHoldPointer(t *testing.T) { require.Equal(t, true, sizeOfUintptr >= sizeOfUnsafePointer) -} - -func TestJoinTableMetaKeyMode(t *testing.T) { - tinyTp := types.NewFieldType(mysql.TypeTiny) - intTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.UnsignedFlag) - yearTp := types.NewFieldType(mysql.TypeYear) - durationTp := types.NewFieldType(mysql.TypeDuration) - enumTp := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) - setTp := types.NewFieldType(mysql.TypeSet) - bitTp := types.NewFieldType(mysql.TypeBit) - jsonTp := types.NewFieldType(mysql.TypeJSON) - floatTp := types.NewFieldType(mysql.TypeFloat) - doubleTp := types.NewFieldType(mysql.TypeDouble) - stringTp := types.NewFieldType(mysql.TypeVarString) - dateTp := types.NewFieldType(mysql.TypeDatetime) - decimalTp := types.NewFieldType(mysql.TypeNewDecimal) - - type testCase struct { - buildKeyIndex []int - buildTypes []*types.FieldType - buildKeyTypes []*types.FieldType - probeKeyTypes []*types.FieldType - keyMode keyMode - } - - testCases := []testCase{ - // OneInt64 for some basic fixed size type - {[]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, OneInt64}, - {[]int{0}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, OneInt64}, - {[]int{0}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, OneInt64}, - {[]int{0}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, OneInt64}, - {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, OneInt64}, - {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, OneInt64}, - {[]int{0}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, OneInt64}, - {[]int{0}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, OneInt64}, - // fixed serialized key for uint = int - {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{uintTp}, FixedSerializedKey}, - {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{intTp}, FixedSerializedKey}, - // fixed serialized key for float/double - {[]int{0}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, FixedSerializedKey}, - {[]int{0}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, FixedSerializedKey}, - // fixed serialized key for multiple fixed size join keys - {[]int{0, 1}, []*types.FieldType{dateTp, intTp}, []*types.FieldType{dateTp, intTp}, []*types.FieldType{dateTp, intTp}, FixedSerializedKey}, - {[]int{0, 1}, []*types.FieldType{intTp, intTp}, []*types.FieldType{intTp, intTp}, []*types.FieldType{intTp, intTp}, FixedSerializedKey}, - // variable serialized key for decimal type - {[]int{0}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, VariableSerializedKey}, - // variable serialized key for string related type - {[]int{0}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, VariableSerializedKey}, - {[]int{0}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, VariableSerializedKey}, - {[]int{0}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, VariableSerializedKey}, - {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, VariableSerializedKey}, - {[]int{0, 1}, []*types.FieldType{intTp, stringTp}, []*types.FieldType{intTp, stringTp}, []*types.FieldType{intTp, stringTp}, VariableSerializedKey}, - } - - for index, test := range testCases { - meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, []int{}, false) - require.Equal(t, test.keyMode, meta.keyMode, "test index: "+strconv.Itoa(index)) - } -} - -func TestJoinTableMetaKeyInlinedAndFixed(t *testing.T) { - tinyTp := types.NewFieldType(mysql.TypeTiny) - intTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.UnsignedFlag) - yearTp := types.NewFieldType(mysql.TypeYear) - durationTp := types.NewFieldType(mysql.TypeDuration) - enumTp := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) - setTp := types.NewFieldType(mysql.TypeSet) - bitTp := types.NewFieldType(mysql.TypeBit) - jsonTp := types.NewFieldType(mysql.TypeJSON) - floatTp := types.NewFieldType(mysql.TypeFloat) - doubleTp := types.NewFieldType(mysql.TypeDouble) - stringTp := types.NewFieldType(mysql.TypeVarString) - binaryStringTp := types.NewFieldType(mysql.TypeBlob) - dateTp := types.NewFieldType(mysql.TypeDatetime) - decimalTp := types.NewFieldType(mysql.TypeNewDecimal) - - type testCase struct { - buildKeyIndex []int - buildTypes []*types.FieldType - buildKeyTypes []*types.FieldType - probeKeyTypes []*types.FieldType - isJoinKeysInlined bool - isJoinKeysFixedLength bool - joinKeysLength int - } - - testCases := []testCase{ - // inlined and fixed for int related type - {[]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, true, true, 8}, - {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, true, true, 8}, - {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, true, true, 8}, - {[]int{0}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, true, true, 8}, - {[]int{0}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, true, true, 8}, - // inlined and fixed for multiple fixed join keys - {[]int{0, 1}, []*types.FieldType{intTp, durationTp}, []*types.FieldType{intTp, durationTp}, []*types.FieldType{intTp, durationTp}, true, true, 16}, - // inlined but not fixed for binary string - {[]int{0}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, true, false, -1}, - // inlined but not fixed for multiple join keys - {[]int{0, 1}, []*types.FieldType{binaryStringTp, intTp}, []*types.FieldType{binaryStringTp, intTp}, []*types.FieldType{binaryStringTp, intTp}, true, false, -1}, - // not inlined but fixed for some fixed size join key - {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{intTp}, false, true, 9}, - {[]int{0}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, false, true, 8}, - {[]int{0}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, false, true, 8}, - {[]int{0}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, false, true, 8}, - {[]int{0}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, false, true, 8}, - {[]int{0}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, false, true, 8}, - {[]int{0, 1}, []*types.FieldType{bitTp, intTp}, []*types.FieldType{bitTp, intTp}, []*types.FieldType{bitTp, intTp}, false, true, 16}, - // not inlined and not fixed for decimal - {[]int{0}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, false, false, -1}, - // not inlined and not fixed for non-binary string related types - {[]int{0}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, false, false, -1}, - {[]int{0}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, false, false, -1}, - {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, false, false, -1}, - {[]int{0}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, false, false, -1}, - // not inlined and not fixed for multiple join keys - {[]int{0, 1}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, false, false, -1}, - {[]int{0, 1}, []*types.FieldType{enumTp, intTp}, []*types.FieldType{enumTp, intTp}, []*types.FieldType{enumTp, intTp}, false, false, -1}, - {[]int{0, 1}, []*types.FieldType{enumTp, decimalTp}, []*types.FieldType{enumTp, decimalTp}, []*types.FieldType{enumTp, decimalTp}, false, false, -1}, - } - - for index, test := range testCases { - meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, []int{}, false) - require.Equal(t, test.isJoinKeysInlined, meta.isJoinKeysInlined, "test index: "+strconv.Itoa(index)) - require.Equal(t, test.isJoinKeysFixedLength, meta.isJoinKeysFixedLength, "test index: "+strconv.Itoa(index)) - require.Equal(t, test.joinKeysLength, meta.joinKeysLength, "test index: "+strconv.Itoa(index)) - } -} - -func TestReadNullMapThreadSafe(t *testing.T) { - // meta with usedFlag - tinyTp := types.NewFieldType(mysql.TypeTiny) - metaWithUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, true) - for columnIndex := 0; columnIndex < 100; columnIndex++ { - require.Equal(t, columnIndex >= 31, metaWithUsedFlag.isReadNullMapThreadSafe(columnIndex)) - } - // meta without usedFlag - metaWithoutUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, false) - for columnIndex := 0; columnIndex < 100; columnIndex++ { - require.Equal(t, true, metaWithoutUsedFlag.isReadNullMapThreadSafe(columnIndex)) - } -} - -func TestJoinTableMetaSerializedMode(t *testing.T) { - intTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.UnsignedFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - binaryStringTp := types.NewFieldType(mysql.TypeBlob) - decimalTp := types.NewFieldType(mysql.TypeNewDecimal) - enumTp := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) - setTp := types.NewFieldType(mysql.TypeSet) - jsonTp := types.NewFieldType(mysql.TypeJSON) - - type testCase struct { - buildKeyIndex []int - buildTypes []*types.FieldType - buildKeyTypes []*types.FieldType - probeKeyTypes []*types.FieldType - serializeModes []codec.SerializeMode - } - testCases := []testCase{ - // normal case, no special serialize mode - {[]int{0, 1}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, []codec.SerializeMode{codec.Normal, codec.Normal}}, - // test NeedSignFlag - {[]int{0, 1}, []*types.FieldType{uintTp, intTp}, []*types.FieldType{uintTp, intTp}, []*types.FieldType{intTp, intTp}, []codec.SerializeMode{codec.NeedSignFlag, codec.Normal}}, - {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{intTp}, []codec.SerializeMode{codec.NeedSignFlag}}, - // test KeepVarColumnLength - {[]int{0, 1}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{intTp, binaryStringTp}, []codec.SerializeMode{codec.Normal, codec.KeepVarColumnLength}}, - {[]int{0}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, []codec.SerializeMode{codec.KeepVarColumnLength}}, - // binaryString is not inlined, no need to keep var column length - {[]int{0, 1}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{uintTp, binaryStringTp}, []codec.SerializeMode{codec.NeedSignFlag, codec.Normal}}, - // multiple var-length column, need keep var column length - {[]int{0, 1}, []*types.FieldType{stringTp, binaryStringTp}, []*types.FieldType{stringTp, binaryStringTp}, []*types.FieldType{stringTp, binaryStringTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, - {[]int{0, 1}, []*types.FieldType{stringTp, decimalTp}, []*types.FieldType{stringTp, decimalTp}, []*types.FieldType{stringTp, decimalTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, - // set/json/decimal/enum is treated as var-length column - {[]int{0, 1}, []*types.FieldType{setTp, jsonTp, decimalTp, enumTp}, []*types.FieldType{setTp, jsonTp, decimalTp, enumTp}, []*types.FieldType{setTp, jsonTp, decimalTp, enumTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength, codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, - {[]int{0, 1}, []*types.FieldType{setTp, jsonTp, decimalTp}, []*types.FieldType{setTp, jsonTp, decimalTp}, []*types.FieldType{setTp, jsonTp, decimalTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, - {[]int{0, 1}, []*types.FieldType{jsonTp, decimalTp}, []*types.FieldType{jsonTp, decimalTp}, []*types.FieldType{jsonTp, decimalTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, - {[]int{0, 1}, []*types.FieldType{setTp, enumTp}, []*types.FieldType{setTp, enumTp}, []*types.FieldType{setTp, enumTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, - // enumWithIntFlag is fix length column - {[]int{0, 1}, []*types.FieldType{enumWithIntFlag, enumTp}, []*types.FieldType{enumWithIntFlag, enumTp}, []*types.FieldType{enumWithIntFlag, enumTp}, []codec.SerializeMode{codec.Normal, codec.Normal}}, - // single non-inlined var length column don't need keep var column length - {[]int{0, 1}, []*types.FieldType{setTp, enumWithIntFlag}, []*types.FieldType{setTp, enumWithIntFlag}, []*types.FieldType{setTp, enumWithIntFlag}, []codec.SerializeMode{codec.Normal, codec.Normal}}, - } - for index, test := range testCases { - meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, []int{}, false) - for modeIndex, mode := range meta.serializeModes { - require.Equal(t, test.serializeModes[modeIndex], mode, meta.isJoinKeysFixedLength, "test index: "+strconv.Itoa(index)+", key index: "+strconv.Itoa(modeIndex)) - } - } -} - -func TestJoinTableMetaRowColumnsOrder(t *testing.T) { - intTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.UnsignedFlag) - enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) - enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - dateTp := types.NewFieldType(mysql.TypeDatetime) - decimalTp := types.NewFieldType(mysql.TypeNewDecimal) - type testCase struct { - buildKeyIndex []int - buildTypes []*types.FieldType - buildKeyTypes []*types.FieldType - probeKeyTypes []*types.FieldType - columnsUsedByOtherCondition []int - outputColumns []int - rowColumnOrder []int - } - testCases := []testCase{ - // columns not used will not be converted to row format - {[]int{0}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, nil, []int{}, []int{}}, - // inlined keys will be converted to row format even is not needed by output columns - {[]int{1}, []*types.FieldType{intTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, []int{}, []int{1}}, - // inlined keys is the first columns - {[]int{2}, []*types.FieldType{intTp, intTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, []int{0, 1, 2}, []int{2, 0, 1}}, - // other condition columns will be first columns if key is not inlined - {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{2, 3}, []int{0, 1, 2, 3}, []int{2, 3, 0, 1}}, - {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{3, 2}, []int{0, 1, 2, 3}, []int{3, 2, 0, 1}}, - // other condition columns will be converted to row format even if not needed by output columns - {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{3, 2}, []int{}, []int{3, 2}}, - // inlined keys + other condition columns + other columns - {[]int{4}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []int{2, 0}, []int{0, 1, 2, 3, 4}, []int{4, 2, 0, 1, 3}}, - // not inlined key + no other condition, follow the same order in output columns - {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, nil, []int{4, 1, 0, 2, 3}, []int{4, 1, 0, 2, 3}}, - // not inlined key + no other condition + nil output columns - {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, nil, nil, []int{0, 1, 2, 3, 4}}, - } - - for index, test := range testCases { - meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, test.columnsUsedByOtherCondition, test.outputColumns, false) - require.Equal(t, len(test.rowColumnOrder), len(meta.rowColumnsOrder), "test index: "+strconv.Itoa(index)) - for rowIndex, order := range test.rowColumnOrder { - require.Equal(t, order, meta.rowColumnsOrder[rowIndex], "test index: "+strconv.Itoa(index)+", row index: "+strconv.Itoa(rowIndex)) - } - } -} - -func TestJoinTableMetaNullMapLength(t *testing.T) { - intTp := types.NewFieldType(mysql.TypeLonglong) - uintTp := types.NewFieldType(mysql.TypeLonglong) - uintTp.AddFlag(mysql.UnsignedFlag) - notNullIntTp := types.NewFieldType(mysql.TypeLonglong) - notNullIntTp.SetFlag(mysql.NotNullFlag) - stringTp := types.NewFieldType(mysql.TypeVarString) - type testCase struct { - buildKeyIndex []int - buildTypes []*types.FieldType - buildKeyTypes []*types.FieldType - probeKeyTypes []*types.FieldType - outputColumns []int - needUsedFlag bool - nullMapLength int - } - testCases := []testCase{ - // usedFlag is false - // nullmap is 1 byte alignment - {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, false, 1}, - {[]int{0}, []*types.FieldType{intTp, intTp, intTp, intTp, intTp, intTp, intTp, intTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, false, 2}, - // even if columns is not null, nullmap is still needed - {[]int{0}, []*types.FieldType{notNullIntTp}, []*types.FieldType{notNullIntTp}, []*types.FieldType{notNullIntTp}, nil, false, 1}, - // nullmap only used for columns that needed to be converted to rows - {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{}, false, 0}, - {[]int{0}, []*types.FieldType{stringTp, intTp, intTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{}, false, 0}, - // usedFlag is true - // the row length is at least 4 bytes, so nullmap is 1 byte alignment - {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, true, 1}, - {[]int{0}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{1}, true, 1}, - {[]int{0}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{0}, true, 1}, - {[]int{0, 1}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp, intTp}, []int{}, true, 1}, - {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{uintTp}, []int{}, true, 1}, - // the row length is not guaranteed to be at least 4 bytes, nullmap is 4 bytes alignment - {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{}, true, 4}, - {[]int{0, 1}, []*types.FieldType{stringTp, stringTp}, []*types.FieldType{stringTp, stringTp}, []*types.FieldType{stringTp, stringTp}, []int{}, true, 4}, - } - for index, test := range testCases { - meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, test.outputColumns, test.needUsedFlag) - require.Equal(t, test.nullMapLength, meta.nullMapLength, "test index: "+strconv.Itoa(index)) - } -} +} \ No newline at end of file diff --git a/pkg/executor/join/join_table_meta.go b/pkg/executor/join/join_table_meta.go new file mode 100644 index 0000000000000..02ff39a3f4020 --- /dev/null +++ b/pkg/executor/join/join_table_meta.go @@ -0,0 +1,364 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package join + +import ( + "sync/atomic" + "unsafe" + + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/pingcap/tidb/pkg/util/collate" + "github.com/pingcap/tidb/pkg/util/hack" + "github.com/pingcap/tidb/pkg/util/serialization" +) + +// joinTableMeta is the join table meta used in hash join v2 +type joinTableMeta struct { + // if the row has fixed length + isFixedLength bool + // the row length if the row is fixed length + rowLength int + // if the join keys has fixed length + isJoinKeysFixedLength bool + // the join keys length if it is fixed length + joinKeysLength int + // is the join key inlined in the row data, the join key can be inlined if and only if + // 1. keyProb.canBeInlined returns true for all the keys + // 2. there is no duplicate join keys + isJoinKeysInlined bool + // the length of null map, the null map include null bit for each column in the row and the used flag for right semi/outer join + nullMapLength int + // the column order in row layout, as described above, the save column order maybe different from the column order in build schema + // for example, the build schema maybe [col1, col2, col3], and the column order in row maybe [col2, col1, col3], then this array + // is [1, 0, 2] + rowColumnsOrder []int + // the column size of each column, -1 mean variable column, the order is the same as rowColumnsOrder + columnsSize []int + // the serialize mode for each key + serializeModes []codec.SerializeMode + // the first n columns in row is used for other condition, if a join has other condition, we only need to extract + // first n columns from the RowTable to evaluate other condition + columnCountNeededForOtherCondition int + // total column numbers for build side chunk, this is used to construct the chunk if there is join other condition + totalColumnNumber int + // column index offset in null map, will be 1 when if there is usedFlag and 0 if there is no usedFlag + colOffsetInNullMap int + // keyMode is the key mode, it can be OneInt/FixedSerializedKey/VariableSerializedKey + keyMode keyMode + // offset to rowData, -1 for variable length, non-inlined key + rowDataOffset int + // fakeKeyByte is used as the fake key when current join need keep invalid key rows + fakeKeyByte []byte +} + +func (meta *joinTableMeta) getSerializedKeyLength(rowStart unsafe.Pointer) uint64 { + return *(*uint64)(unsafe.Add(rowStart, sizeOfNextPtr+meta.nullMapLength)) +} + +func (meta *joinTableMeta) isReadNullMapThreadSafe(columnIndex int) bool { + // Other goroutine will use `atomic.StoreUint32` to write to the first 32 bit in nullmap when it need to set usedFlag + // so read from nullMap may meet concurrent write if meta.colOffsetInNullMap == 1 && (columnIndex + meta.colOffsetInNullMap < 32) + mayConcurrentWrite := meta.colOffsetInNullMap == 1 && columnIndex < 31 + return !mayConcurrentWrite +} + +// used in tests +func (meta *joinTableMeta) getKeyBytes(rowStart unsafe.Pointer) []byte { + switch meta.keyMode { + case OneInt64: + return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr), int(serialization.Uint64Len)) + case FixedSerializedKey: + return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr), meta.joinKeysLength) + case VariableSerializedKey: + return hack.GetBytesFromPtr(unsafe.Add(rowStart, meta.nullMapLength+sizeOfNextPtr+sizeOfLengthField), int(meta.getSerializedKeyLength(rowStart))) + default: + panic("unknown key match type") + } +} + +func (meta *joinTableMeta) advanceToRowData(matchedRowInfo *matchedRowInfo) { + if meta.rowDataOffset == -1 { + // variable length, non-inlined key + matchedRowInfo.buildRowOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfLengthField + int(meta.getSerializedKeyLength(*(*unsafe.Pointer)(unsafe.Pointer(&matchedRowInfo.buildRowStart)))) + } else { + matchedRowInfo.buildRowOffset = meta.rowDataOffset + } +} + +func (meta *joinTableMeta) isColumnNull(rowStart unsafe.Pointer, columnIndex int) bool { + byteIndex := (columnIndex + meta.colOffsetInNullMap) / 8 + bitIndex := (columnIndex + meta.colOffsetInNullMap) % 8 + return *(*uint8)(unsafe.Add(rowStart, sizeOfNextPtr+byteIndex))&(uint8(1)<<(7-bitIndex)) != uint8(0) +} + +// for join that need to set UsedFlag during probe stage, read from nullMap is not thread safe for the first 32 bit of nullMap, atomic.LoadUint32 is used to avoid read-write conflict +func (*joinTableMeta) isColumnNullThreadSafe(rowStart unsafe.Pointer, columnIndex int) bool { + return atomic.LoadUint32((*uint32)(unsafe.Add(rowStart, sizeOfNextPtr)))&bitMaskInUint32[columnIndex+1] != uint32(0) +} + +func (*joinTableMeta) setUsedFlag(rowStart unsafe.Pointer) { + addr := (*uint32)(unsafe.Add(rowStart, sizeOfNextPtr)) + value := atomic.LoadUint32(addr) + value |= usedFlagMask + atomic.StoreUint32(addr, value) +} + +func (*joinTableMeta) isCurrentRowUsed(rowStart unsafe.Pointer) bool { + return (*(*uint32)(unsafe.Add(rowStart, sizeOfNextPtr)) & usedFlagMask) == usedFlagMask +} + +type keyProp struct { + canBeInlined bool + keyLength int + isKeyInteger bool + isKeyUnsigned bool +} + +func getKeyProp(tp *types.FieldType) *keyProp { + switch tp.GetType() { + case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeYear, + mysql.TypeDuration: + isKeyUnsigned := mysql.HasUnsignedFlag(tp.GetFlag()) + if tp.GetType() == mysql.TypeYear { + // year type is always unsigned + isKeyUnsigned = true + } else if tp.GetType() == mysql.TypeDuration { + // duration type is always signed + isKeyUnsigned = false + } + return &keyProp{canBeInlined: true, keyLength: chunk.GetFixedLen(tp), isKeyInteger: true, isKeyUnsigned: isKeyUnsigned} + case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob: + collator := collate.GetCollator(tp.GetCollate()) + return &keyProp{canBeInlined: collate.CanUseRawMemAsKey(collator), keyLength: chunk.VarElemLen, isKeyInteger: false, isKeyUnsigned: false} + case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp: + // date related type will use uint64 as serialized key + return &keyProp{canBeInlined: false, keyLength: int(serialization.Uint64Len), isKeyInteger: true, isKeyUnsigned: true} + case mysql.TypeFloat: + // float will use float64 as serialized key + return &keyProp{canBeInlined: false, keyLength: int(serialization.Float64Len), isKeyInteger: false, isKeyUnsigned: false} + case mysql.TypeNewDecimal: + // Although decimal is fixed length, but its key is not fixed length + return &keyProp{canBeInlined: false, keyLength: chunk.VarElemLen, isKeyInteger: false, isKeyUnsigned: false} + case mysql.TypeEnum: + if mysql.HasEnumSetAsIntFlag(tp.GetFlag()) { + // enum int type is always unsigned + return &keyProp{canBeInlined: false, keyLength: int(serialization.Uint64Len), isKeyInteger: true, isKeyUnsigned: true} + } + return &keyProp{canBeInlined: false, keyLength: chunk.VarElemLen, isKeyInteger: false, isKeyUnsigned: false} + case mysql.TypeBit: + // bit type is always unsigned + return &keyProp{canBeInlined: false, keyLength: int(serialization.Uint64Len), isKeyInteger: true, isKeyUnsigned: true} + default: + keyLength := chunk.GetFixedLen(tp) + return &keyProp{canBeInlined: false, keyLength: keyLength, isKeyInteger: false, isKeyUnsigned: false} + } +} + +// buildKeyIndex is the build key column index based on buildSchema, should not be nil +// otherConditionColIndex is the column index that will be used in other condition, if no other condition, will be nil +// columnsNeedConvertToRow is the column index that need to be converted to row, should not be nil +// needUsedFlag is true for outer/semi join that use outer to build +func newTableMeta(buildKeyIndex []int, buildTypes, buildKeyTypes, probeKeyTypes []*types.FieldType, columnsUsedByOtherCondition []int, outputColumns []int, needUsedFlag bool) *joinTableMeta { + meta := &joinTableMeta{} + meta.isFixedLength = true + meta.rowLength = 0 + meta.totalColumnNumber = len(buildTypes) + + columnsNeedToBeSaved := make(map[int]struct{}, len(buildTypes)) + updateMeta := func(index int) { + if _, ok := columnsNeedToBeSaved[index]; !ok { + columnsNeedToBeSaved[index] = struct{}{} + length := chunk.GetFixedLen(buildTypes[index]) + if length == chunk.VarElemLen { + meta.isFixedLength = false + } else { + meta.rowLength += length + } + } + } + if outputColumns == nil { + // outputColumns = nil means all the column is needed + for index := range buildTypes { + updateMeta(index) + } + } else { + for _, index := range outputColumns { + updateMeta(index) + } + for _, index := range columnsUsedByOtherCondition { + updateMeta(index) + } + } + + meta.isJoinKeysFixedLength = true + meta.joinKeysLength = 0 + meta.isJoinKeysInlined = true + keyIndexMap := make(map[int]struct{}) + meta.serializeModes = make([]codec.SerializeMode, 0, len(buildKeyIndex)) + isAllKeyInteger := true + hasFixedSizeKeyColumn := false + varLengthKeyNumber := 0 + for index, keyIndex := range buildKeyIndex { + keyType := buildKeyTypes[index] + prop := getKeyProp(keyType) + if prop.keyLength != chunk.VarElemLen { + meta.joinKeysLength += prop.keyLength + hasFixedSizeKeyColumn = true + } else { + meta.isJoinKeysFixedLength = false + varLengthKeyNumber++ + } + if !prop.canBeInlined { + meta.isJoinKeysInlined = false + } + if prop.isKeyInteger { + buildUnsigned := prop.isKeyUnsigned + probeKeyProp := getKeyProp(probeKeyTypes[index]) + if !probeKeyProp.isKeyInteger { + panic("build key is integer but probe key is not integer, should not happens") + } + probeUnsigned := probeKeyProp.isKeyUnsigned + if (buildUnsigned && !probeUnsigned) || (probeUnsigned && !buildUnsigned) { + meta.serializeModes = append(meta.serializeModes, codec.NeedSignFlag) + meta.isJoinKeysInlined = false + if meta.isJoinKeysFixedLength { + // an extra sign flag is needed in this case + meta.joinKeysLength++ + } + } else { + meta.serializeModes = append(meta.serializeModes, codec.Normal) + } + } else { + if !prop.isKeyInteger { + isAllKeyInteger = false + } + if prop.keyLength == chunk.VarElemLen { + // keep var column by default for var length column + meta.serializeModes = append(meta.serializeModes, codec.KeepVarColumnLength) + } else { + meta.serializeModes = append(meta.serializeModes, codec.Normal) + } + } + keyIndexMap[keyIndex] = struct{}{} + } + if !meta.isJoinKeysFixedLength { + meta.joinKeysLength = -1 + } + if len(buildKeyIndex) != len(keyIndexMap) { + // has duplicated key, can not be inlined + meta.isJoinKeysInlined = false + } + if !meta.isJoinKeysInlined { + if varLengthKeyNumber == 1 { + // if key is not inlined and there is only one var-length key, then don't need to record the var length + for i := 0; i < len(buildKeyIndex); i++ { + if meta.serializeModes[i] == codec.KeepVarColumnLength { + meta.serializeModes[i] = codec.Normal + } + } + } + } else { + for _, index := range buildKeyIndex { + updateMeta(index) + } + } + if !meta.isFixedLength { + meta.rowLength = 0 + } + // construct the column order + meta.rowColumnsOrder = make([]int, 0, len(columnsNeedToBeSaved)) + meta.columnsSize = make([]int, 0, len(columnsNeedToBeSaved)) + usedColumnMap := make(map[int]struct{}, len(columnsNeedToBeSaved)) + + updateColumnOrder := func(index int) { + if _, ok := usedColumnMap[index]; !ok { + meta.rowColumnsOrder = append(meta.rowColumnsOrder, index) + meta.columnsSize = append(meta.columnsSize, chunk.GetFixedLen(buildTypes[index])) + usedColumnMap[index] = struct{}{} + } + } + if meta.isJoinKeysInlined { + // if join key is inlined, the join key will be the first columns + for _, index := range buildKeyIndex { + updateColumnOrder(index) + } + } + meta.columnCountNeededForOtherCondition = 0 + if len(columnsUsedByOtherCondition) > 0 { + // if join has other condition, the columns used by other condition is appended to row layout after the key + for _, index := range columnsUsedByOtherCondition { + updateColumnOrder(index) + } + meta.columnCountNeededForOtherCondition = len(usedColumnMap) + } + if outputColumns == nil { + // outputColumns = nil means all the column is needed + for index := range buildTypes { + updateColumnOrder(index) + } + } else { + for _, index := range outputColumns { + updateColumnOrder(index) + } + } + if isAllKeyInteger && len(buildKeyIndex) == 1 && meta.serializeModes[0] != codec.NeedSignFlag { + meta.keyMode = OneInt64 + } else { + if meta.isJoinKeysFixedLength { + meta.keyMode = FixedSerializedKey + } else { + meta.keyMode = VariableSerializedKey + } + } + if needUsedFlag { + meta.colOffsetInNullMap = 1 + // the total row length should be larger than 4 byte since the smallest unit of atomic.LoadXXX is UInt32 + if len(columnsNeedToBeSaved) > 0 { + // the smallest length of a column is 4 byte, so the total row length is enough + meta.nullMapLength = (len(columnsNeedToBeSaved) + 1 + 7) / 8 + } else { + // if no columns need to be converted to row format, then the key is not inlined + // 1. if any of the key columns is fixed length, then the row length is larger than 4 bytes(since the smallest length of a fixed length column is 4 bytes) + // 2. if all the key columns are variable length, there is no guarantee that the row length is larger than 4 byte, the nullmap should be 4 bytes alignment + if hasFixedSizeKeyColumn { + meta.nullMapLength = (len(columnsNeedToBeSaved) + 1 + 7) / 8 + } else { + meta.nullMapLength = ((len(columnsNeedToBeSaved) + 1 + 31) / 32) * 4 + } + } + } else { + meta.colOffsetInNullMap = 0 + meta.nullMapLength = (len(columnsNeedToBeSaved) + 7) / 8 + } + meta.rowDataOffset = -1 + if meta.isJoinKeysInlined { + if meta.isJoinKeysFixedLength { + meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength + } else { + meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength + sizeOfLengthField + } + } else { + if meta.isJoinKeysFixedLength { + meta.rowDataOffset = sizeOfNextPtr + meta.nullMapLength + meta.joinKeysLength + } + } + if meta.isJoinKeysFixedLength && !meta.isJoinKeysInlined { + meta.fakeKeyByte = make([]byte, meta.joinKeysLength) + } + return meta +} diff --git a/pkg/executor/join/join_table_meta_test.go b/pkg/executor/join/join_table_meta_test.go new file mode 100644 index 0000000000000..8ca39552ef8de --- /dev/null +++ b/pkg/executor/join/join_table_meta_test.go @@ -0,0 +1,315 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package join + +import ( + "strconv" + "testing" + + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/codec" + "github.com/stretchr/testify/require" +) + +func TestJoinTableMetaKeyMode(t *testing.T) { + tinyTp := types.NewFieldType(mysql.TypeTiny) + intTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.UnsignedFlag) + yearTp := types.NewFieldType(mysql.TypeYear) + durationTp := types.NewFieldType(mysql.TypeDuration) + enumTp := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) + setTp := types.NewFieldType(mysql.TypeSet) + bitTp := types.NewFieldType(mysql.TypeBit) + jsonTp := types.NewFieldType(mysql.TypeJSON) + floatTp := types.NewFieldType(mysql.TypeFloat) + doubleTp := types.NewFieldType(mysql.TypeDouble) + stringTp := types.NewFieldType(mysql.TypeVarString) + dateTp := types.NewFieldType(mysql.TypeDatetime) + decimalTp := types.NewFieldType(mysql.TypeNewDecimal) + + type testCase struct { + buildKeyIndex []int + buildTypes []*types.FieldType + buildKeyTypes []*types.FieldType + probeKeyTypes []*types.FieldType + keyMode keyMode + } + + testCases := []testCase{ + // OneInt64 for some basic fixed size type + {[]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, OneInt64}, + {[]int{0}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, OneInt64}, + {[]int{0}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, OneInt64}, + {[]int{0}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, OneInt64}, + {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, OneInt64}, + {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, OneInt64}, + {[]int{0}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, OneInt64}, + {[]int{0}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, OneInt64}, + // fixed serialized key for uint = int + {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{uintTp}, FixedSerializedKey}, + {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{intTp}, FixedSerializedKey}, + // fixed serialized key for float/double + {[]int{0}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, FixedSerializedKey}, + {[]int{0}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, FixedSerializedKey}, + // fixed serialized key for multiple fixed size join keys + {[]int{0, 1}, []*types.FieldType{dateTp, intTp}, []*types.FieldType{dateTp, intTp}, []*types.FieldType{dateTp, intTp}, FixedSerializedKey}, + {[]int{0, 1}, []*types.FieldType{intTp, intTp}, []*types.FieldType{intTp, intTp}, []*types.FieldType{intTp, intTp}, FixedSerializedKey}, + // variable serialized key for decimal type + {[]int{0}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, VariableSerializedKey}, + // variable serialized key for string related type + {[]int{0}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, VariableSerializedKey}, + {[]int{0}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, VariableSerializedKey}, + {[]int{0}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, VariableSerializedKey}, + {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, VariableSerializedKey}, + {[]int{0, 1}, []*types.FieldType{intTp, stringTp}, []*types.FieldType{intTp, stringTp}, []*types.FieldType{intTp, stringTp}, VariableSerializedKey}, + } + + for index, test := range testCases { + meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, []int{}, false) + require.Equal(t, test.keyMode, meta.keyMode, "test index: "+strconv.Itoa(index)) + } +} + +func TestJoinTableMetaKeyInlinedAndFixed(t *testing.T) { + tinyTp := types.NewFieldType(mysql.TypeTiny) + intTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.UnsignedFlag) + yearTp := types.NewFieldType(mysql.TypeYear) + durationTp := types.NewFieldType(mysql.TypeDuration) + enumTp := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) + setTp := types.NewFieldType(mysql.TypeSet) + bitTp := types.NewFieldType(mysql.TypeBit) + jsonTp := types.NewFieldType(mysql.TypeJSON) + floatTp := types.NewFieldType(mysql.TypeFloat) + doubleTp := types.NewFieldType(mysql.TypeDouble) + stringTp := types.NewFieldType(mysql.TypeVarString) + binaryStringTp := types.NewFieldType(mysql.TypeBlob) + dateTp := types.NewFieldType(mysql.TypeDatetime) + decimalTp := types.NewFieldType(mysql.TypeNewDecimal) + + type testCase struct { + buildKeyIndex []int + buildTypes []*types.FieldType + buildKeyTypes []*types.FieldType + probeKeyTypes []*types.FieldType + isJoinKeysInlined bool + isJoinKeysFixedLength bool + joinKeysLength int + } + + testCases := []testCase{ + // inlined and fixed for int related type + {[]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, true, true, 8}, + {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, true, true, 8}, + {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, true, true, 8}, + {[]int{0}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, []*types.FieldType{yearTp}, true, true, 8}, + {[]int{0}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, []*types.FieldType{durationTp}, true, true, 8}, + // inlined and fixed for multiple fixed join keys + {[]int{0, 1}, []*types.FieldType{intTp, durationTp}, []*types.FieldType{intTp, durationTp}, []*types.FieldType{intTp, durationTp}, true, true, 16}, + // inlined but not fixed for binary string + {[]int{0}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, true, false, -1}, + // inlined but not fixed for multiple join keys + {[]int{0, 1}, []*types.FieldType{binaryStringTp, intTp}, []*types.FieldType{binaryStringTp, intTp}, []*types.FieldType{binaryStringTp, intTp}, true, false, -1}, + // not inlined but fixed for some fixed size join key + {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{intTp}, false, true, 9}, + {[]int{0}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, []*types.FieldType{enumWithIntFlag}, false, true, 8}, + {[]int{0}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, []*types.FieldType{doubleTp}, false, true, 8}, + {[]int{0}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, []*types.FieldType{floatTp}, false, true, 8}, + {[]int{0}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, []*types.FieldType{dateTp}, false, true, 8}, + {[]int{0}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, []*types.FieldType{bitTp}, false, true, 8}, + {[]int{0, 1}, []*types.FieldType{bitTp, intTp}, []*types.FieldType{bitTp, intTp}, []*types.FieldType{bitTp, intTp}, false, true, 16}, + // not inlined and not fixed for decimal + {[]int{0}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, []*types.FieldType{decimalTp}, false, false, -1}, + // not inlined and not fixed for non-binary string related types + {[]int{0}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, []*types.FieldType{enumTp}, false, false, -1}, + {[]int{0}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, []*types.FieldType{setTp}, false, false, -1}, + {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, false, false, -1}, + {[]int{0}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, []*types.FieldType{jsonTp}, false, false, -1}, + // not inlined and not fixed for multiple join keys + {[]int{0, 1}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, false, false, -1}, + {[]int{0, 1}, []*types.FieldType{enumTp, intTp}, []*types.FieldType{enumTp, intTp}, []*types.FieldType{enumTp, intTp}, false, false, -1}, + {[]int{0, 1}, []*types.FieldType{enumTp, decimalTp}, []*types.FieldType{enumTp, decimalTp}, []*types.FieldType{enumTp, decimalTp}, false, false, -1}, + } + + for index, test := range testCases { + meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, []int{}, false) + require.Equal(t, test.isJoinKeysInlined, meta.isJoinKeysInlined, "test index: "+strconv.Itoa(index)) + require.Equal(t, test.isJoinKeysFixedLength, meta.isJoinKeysFixedLength, "test index: "+strconv.Itoa(index)) + require.Equal(t, test.joinKeysLength, meta.joinKeysLength, "test index: "+strconv.Itoa(index)) + } +} + +func TestReadNullMapThreadSafe(t *testing.T) { + // meta with usedFlag + tinyTp := types.NewFieldType(mysql.TypeTiny) + metaWithUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, true) + for columnIndex := 0; columnIndex < 100; columnIndex++ { + require.Equal(t, columnIndex >= 31, metaWithUsedFlag.isReadNullMapThreadSafe(columnIndex)) + } + // meta without usedFlag + metaWithoutUsedFlag := newTableMeta([]int{0}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, []*types.FieldType{tinyTp}, nil, []int{}, false) + for columnIndex := 0; columnIndex < 100; columnIndex++ { + require.Equal(t, true, metaWithoutUsedFlag.isReadNullMapThreadSafe(columnIndex)) + } +} + +func TestJoinTableMetaSerializedMode(t *testing.T) { + intTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.UnsignedFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + binaryStringTp := types.NewFieldType(mysql.TypeBlob) + decimalTp := types.NewFieldType(mysql.TypeNewDecimal) + enumTp := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) + setTp := types.NewFieldType(mysql.TypeSet) + jsonTp := types.NewFieldType(mysql.TypeJSON) + + type testCase struct { + buildKeyIndex []int + buildTypes []*types.FieldType + buildKeyTypes []*types.FieldType + probeKeyTypes []*types.FieldType + serializeModes []codec.SerializeMode + } + testCases := []testCase{ + // normal case, no special serialize mode + {[]int{0, 1}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, []*types.FieldType{decimalTp, intTp}, []codec.SerializeMode{codec.Normal, codec.Normal}}, + // test NeedSignFlag + {[]int{0, 1}, []*types.FieldType{uintTp, intTp}, []*types.FieldType{uintTp, intTp}, []*types.FieldType{intTp, intTp}, []codec.SerializeMode{codec.NeedSignFlag, codec.Normal}}, + {[]int{0}, []*types.FieldType{uintTp}, []*types.FieldType{uintTp}, []*types.FieldType{intTp}, []codec.SerializeMode{codec.NeedSignFlag}}, + // test KeepVarColumnLength + {[]int{0, 1}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{intTp, binaryStringTp}, []codec.SerializeMode{codec.Normal, codec.KeepVarColumnLength}}, + {[]int{0}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, []*types.FieldType{binaryStringTp}, []codec.SerializeMode{codec.KeepVarColumnLength}}, + // binaryString is not inlined, no need to keep var column length + {[]int{0, 1}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{intTp, binaryStringTp}, []*types.FieldType{uintTp, binaryStringTp}, []codec.SerializeMode{codec.NeedSignFlag, codec.Normal}}, + // multiple var-length column, need keep var column length + {[]int{0, 1}, []*types.FieldType{stringTp, binaryStringTp}, []*types.FieldType{stringTp, binaryStringTp}, []*types.FieldType{stringTp, binaryStringTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, + {[]int{0, 1}, []*types.FieldType{stringTp, decimalTp}, []*types.FieldType{stringTp, decimalTp}, []*types.FieldType{stringTp, decimalTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, + // set/json/decimal/enum is treated as var-length column + {[]int{0, 1}, []*types.FieldType{setTp, jsonTp, decimalTp, enumTp}, []*types.FieldType{setTp, jsonTp, decimalTp, enumTp}, []*types.FieldType{setTp, jsonTp, decimalTp, enumTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength, codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, + {[]int{0, 1}, []*types.FieldType{setTp, jsonTp, decimalTp}, []*types.FieldType{setTp, jsonTp, decimalTp}, []*types.FieldType{setTp, jsonTp, decimalTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, + {[]int{0, 1}, []*types.FieldType{jsonTp, decimalTp}, []*types.FieldType{jsonTp, decimalTp}, []*types.FieldType{jsonTp, decimalTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, + {[]int{0, 1}, []*types.FieldType{setTp, enumTp}, []*types.FieldType{setTp, enumTp}, []*types.FieldType{setTp, enumTp}, []codec.SerializeMode{codec.KeepVarColumnLength, codec.KeepVarColumnLength}}, + // enumWithIntFlag is fix length column + {[]int{0, 1}, []*types.FieldType{enumWithIntFlag, enumTp}, []*types.FieldType{enumWithIntFlag, enumTp}, []*types.FieldType{enumWithIntFlag, enumTp}, []codec.SerializeMode{codec.Normal, codec.Normal}}, + // single non-inlined var length column don't need keep var column length + {[]int{0, 1}, []*types.FieldType{setTp, enumWithIntFlag}, []*types.FieldType{setTp, enumWithIntFlag}, []*types.FieldType{setTp, enumWithIntFlag}, []codec.SerializeMode{codec.Normal, codec.Normal}}, + } + for index, test := range testCases { + meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, []int{}, false) + for modeIndex, mode := range meta.serializeModes { + require.Equal(t, test.serializeModes[modeIndex], mode, meta.isJoinKeysFixedLength, "test index: "+strconv.Itoa(index)+", key index: "+strconv.Itoa(modeIndex)) + } + } +} + +func TestJoinTableMetaRowColumnsOrder(t *testing.T) { + intTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.UnsignedFlag) + enumWithIntFlag := types.NewFieldType(mysql.TypeEnum) + enumWithIntFlag.AddFlag(mysql.EnumSetAsIntFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + dateTp := types.NewFieldType(mysql.TypeDatetime) + decimalTp := types.NewFieldType(mysql.TypeNewDecimal) + type testCase struct { + buildKeyIndex []int + buildTypes []*types.FieldType + buildKeyTypes []*types.FieldType + probeKeyTypes []*types.FieldType + columnsUsedByOtherCondition []int + outputColumns []int + rowColumnOrder []int + } + testCases := []testCase{ + // columns not used will not be converted to row format + {[]int{0}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, nil, []int{}, []int{}}, + // inlined keys will be converted to row format even is not needed by output columns + {[]int{1}, []*types.FieldType{intTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, []int{}, []int{1}}, + // inlined keys is the first columns + {[]int{2}, []*types.FieldType{intTp, intTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, []int{0, 1, 2}, []int{2, 0, 1}}, + // other condition columns will be first columns if key is not inlined + {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{2, 3}, []int{0, 1, 2, 3}, []int{2, 3, 0, 1}}, + {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{3, 2}, []int{0, 1, 2, 3}, []int{3, 2, 0, 1}}, + // other condition columns will be converted to row format even if not needed by output columns + {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{3, 2}, []int{}, []int{3, 2}}, + // inlined keys + other condition columns + other columns + {[]int{4}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []int{2, 0}, []int{0, 1, 2, 3, 4}, []int{4, 2, 0, 1, 3}}, + // not inlined key + no other condition, follow the same order in output columns + {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, nil, []int{4, 1, 0, 2, 3}, []int{4, 1, 0, 2, 3}}, + // not inlined key + no other condition + nil output columns + {[]int{0}, []*types.FieldType{stringTp, stringTp, dateTp, decimalTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, nil, nil, []int{0, 1, 2, 3, 4}}, + } + + for index, test := range testCases { + meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, test.columnsUsedByOtherCondition, test.outputColumns, false) + require.Equal(t, len(test.rowColumnOrder), len(meta.rowColumnsOrder), "test index: "+strconv.Itoa(index)) + for rowIndex, order := range test.rowColumnOrder { + require.Equal(t, order, meta.rowColumnsOrder[rowIndex], "test index: "+strconv.Itoa(index)+", row index: "+strconv.Itoa(rowIndex)) + } + } +} + +func TestJoinTableMetaNullMapLength(t *testing.T) { + intTp := types.NewFieldType(mysql.TypeLonglong) + uintTp := types.NewFieldType(mysql.TypeLonglong) + uintTp.AddFlag(mysql.UnsignedFlag) + notNullIntTp := types.NewFieldType(mysql.TypeLonglong) + notNullIntTp.SetFlag(mysql.NotNullFlag) + stringTp := types.NewFieldType(mysql.TypeVarString) + type testCase struct { + buildKeyIndex []int + buildTypes []*types.FieldType + buildKeyTypes []*types.FieldType + probeKeyTypes []*types.FieldType + outputColumns []int + needUsedFlag bool + nullMapLength int + } + testCases := []testCase{ + // usedFlag is false + // nullmap is 1 byte alignment + {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, false, 1}, + {[]int{0}, []*types.FieldType{intTp, intTp, intTp, intTp, intTp, intTp, intTp, intTp, intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, false, 2}, + // even if columns is not null, nullmap is still needed + {[]int{0}, []*types.FieldType{notNullIntTp}, []*types.FieldType{notNullIntTp}, []*types.FieldType{notNullIntTp}, nil, false, 1}, + // nullmap only used for columns that needed to be converted to rows + {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{}, false, 0}, + {[]int{0}, []*types.FieldType{stringTp, intTp, intTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{}, false, 0}, + // usedFlag is true + // the row length is at least 4 bytes, so nullmap is 1 byte alignment + {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, nil, true, 1}, + {[]int{0}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{1}, true, 1}, + {[]int{0}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{0}, true, 1}, + {[]int{0, 1}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp, intTp}, []*types.FieldType{stringTp, intTp}, []int{}, true, 1}, + {[]int{0}, []*types.FieldType{intTp}, []*types.FieldType{intTp}, []*types.FieldType{uintTp}, []int{}, true, 1}, + // the row length is not guaranteed to be at least 4 bytes, nullmap is 4 bytes alignment + {[]int{0}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []*types.FieldType{stringTp}, []int{}, true, 4}, + {[]int{0, 1}, []*types.FieldType{stringTp, stringTp}, []*types.FieldType{stringTp, stringTp}, []*types.FieldType{stringTp, stringTp}, []int{}, true, 4}, + } + for index, test := range testCases { + meta := newTableMeta(test.buildKeyIndex, test.buildTypes, test.buildKeyTypes, test.probeKeyTypes, nil, test.outputColumns, test.needUsedFlag) + require.Equal(t, test.nullMapLength, meta.nullMapLength, "test index: "+strconv.Itoa(index)) + } +} diff --git a/pkg/executor/join/row_table_builder.go b/pkg/executor/join/row_table_builder.go new file mode 100644 index 0000000000000..dcc3cae92a219 --- /dev/null +++ b/pkg/executor/join/row_table_builder.go @@ -0,0 +1,281 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package join + +import ( + "hash/fnv" + "unsafe" + + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/codec" +) + +type rowTableBuilder struct { + buildKeyIndex []int + buildKeyTypes []*types.FieldType + hasNullableKey bool + hasFilter bool + keepFilteredRows bool + + serializedKeyVectorBuffer [][]byte + partIdxVector []int + selRows []int + usedRows []int + hashValue []uint64 + firstSegRowSizeHint uint + // filterVector and nullKeyVector is indexed by physical row index because the return vector of VectorizedFilter is based on physical row index + filterVector []bool // if there is filter before probe, filterVector saves the filter result + nullKeyVector []bool // nullKeyVector[i] = true if any of the key is null + + rowNumberInCurrentRowTableSeg []int64 +} + +func createRowTableBuilder(buildKeyIndex []int, buildKeyTypes []*types.FieldType, partitionNumber uint, hasNullableKey bool, hasFilter bool, keepFilteredRows bool) *rowTableBuilder { + builder := &rowTableBuilder{ + buildKeyIndex: buildKeyIndex, + buildKeyTypes: buildKeyTypes, + rowNumberInCurrentRowTableSeg: make([]int64, partitionNumber), + hasNullableKey: hasNullableKey, + hasFilter: hasFilter, + keepFilteredRows: keepFilteredRows, + } + return builder +} + +func (b *rowTableBuilder) initHashValueAndPartIndexForOneChunk(partitionMaskOffset int, partitionNumber uint) { + h := fnv.New64() + fakePartIndex := uint64(0) + for logicalRowIndex, physicalRowIndex := range b.usedRows { + if (b.filterVector != nil && !b.filterVector[physicalRowIndex]) || (b.nullKeyVector != nil && b.nullKeyVector[physicalRowIndex]) { + b.hashValue[logicalRowIndex] = fakePartIndex + b.partIdxVector[logicalRowIndex] = int(fakePartIndex) + fakePartIndex = (fakePartIndex + 1) % uint64(partitionNumber) + continue + } + h.Write(b.serializedKeyVectorBuffer[logicalRowIndex]) + hash := h.Sum64() + b.hashValue[logicalRowIndex] = hash + b.partIdxVector[logicalRowIndex] = int(hash >> partitionMaskOffset) + h.Reset() + } +} + +func (b *rowTableBuilder) processOneChunk(chk *chunk.Chunk, typeCtx types.Context, hashJoinCtx *HashJoinCtxV2, workerID int) error { + b.ResetBuffer(chk) + b.firstSegRowSizeHint = max(uint(1), uint(float64(len(b.usedRows))/float64(hashJoinCtx.partitionNumber)*float64(1.2))) + var err error + if b.hasFilter { + b.filterVector, err = expression.VectorizedFilter(hashJoinCtx.SessCtx.GetExprCtx().GetEvalCtx(), hashJoinCtx.SessCtx.GetSessionVars().EnableVectorizedExpression, hashJoinCtx.BuildFilter, chunk.NewIterator4Chunk(chk), b.filterVector) + if err != nil { + return err + } + } + err = checkSQLKiller(&hashJoinCtx.SessCtx.GetSessionVars().SQLKiller, "killedDuringBuild") + if err != nil { + return err + } + // 1. split partition + for index, colIdx := range b.buildKeyIndex { + err := codec.SerializeKeys(typeCtx, chk, b.buildKeyTypes[index], colIdx, b.usedRows, b.filterVector, b.nullKeyVector, hashJoinCtx.hashTableMeta.serializeModes[index], b.serializedKeyVectorBuffer) + if err != nil { + return err + } + } + err = checkSQLKiller(&hashJoinCtx.SessCtx.GetSessionVars().SQLKiller, "killedDuringBuild") + if err != nil { + return err + } + + b.initHashValueAndPartIndexForOneChunk(hashJoinCtx.partitionMaskOffset, hashJoinCtx.partitionNumber) + + // 2. build rowtable + return b.appendToRowTable(chk, hashJoinCtx, workerID) +} + +func resizeSlice[T int | uint64 | bool](s []T, newSize int) []T { + if cap(s) >= newSize { + s = s[:newSize] + } else { + s = make([]T, newSize) + } + return s +} + +func (b *rowTableBuilder) ResetBuffer(chk *chunk.Chunk) { + b.usedRows = chk.Sel() + logicalRows := chk.NumRows() + physicalRows := chk.Column(0).Rows() + + if b.usedRows == nil { + b.selRows = resizeSlice(b.selRows, logicalRows) + for i := 0; i < logicalRows; i++ { + b.selRows[i] = i + } + b.usedRows = b.selRows + } + b.partIdxVector = resizeSlice(b.partIdxVector, logicalRows) + b.hashValue = resizeSlice(b.hashValue, logicalRows) + if b.hasFilter { + b.filterVector = resizeSlice(b.filterVector, physicalRows) + } + if b.hasNullableKey { + b.nullKeyVector = resizeSlice(b.nullKeyVector, physicalRows) + for i := 0; i < physicalRows; i++ { + b.nullKeyVector[i] = false + } + } + if cap(b.serializedKeyVectorBuffer) >= logicalRows { + b.serializedKeyVectorBuffer = b.serializedKeyVectorBuffer[:logicalRows] + for i := 0; i < logicalRows; i++ { + b.serializedKeyVectorBuffer[i] = b.serializedKeyVectorBuffer[i][:0] + } + } else { + b.serializedKeyVectorBuffer = make([][]byte, logicalRows) + } +} + +func (b *rowTableBuilder) appendRemainingRowLocations(workerID int, htCtx *hashTableContext) { + for partID := 0; partID < int(htCtx.hashTable.partitionNumber); partID++ { + if b.rowNumberInCurrentRowTableSeg[partID] > 0 { + htCtx.finalizeCurrentSeg(workerID, partID, b) + } + } +} + +func fillNullMap(rowTableMeta *joinTableMeta, row *chunk.Row, seg *rowTableSegment) int { + if nullMapLength := rowTableMeta.nullMapLength; nullMapLength > 0 { + bitmap := make([]byte, nullMapLength) + for colIndexInRowTable, colIndexInRow := range rowTableMeta.rowColumnsOrder { + colIndexInBitMap := colIndexInRowTable + rowTableMeta.colOffsetInNullMap + if row.IsNull(colIndexInRow) { + bitmap[colIndexInBitMap/8] |= 1 << (7 - colIndexInBitMap%8) + } + } + seg.rawData = append(seg.rawData, bitmap...) + return nullMapLength + } + return 0 +} + +func fillNextRowPtr(seg *rowTableSegment) int { + seg.rawData = append(seg.rawData, fakeAddrPlaceHolder...) + return sizeOfNextPtr +} + +func (b *rowTableBuilder) fillSerializedKeyAndKeyLengthIfNeeded(rowTableMeta *joinTableMeta, hasValidKey bool, logicalRowIndex int, seg *rowTableSegment) int { + appendRowLength := 0 + // 1. fill key length if needed + if !rowTableMeta.isJoinKeysFixedLength { + // if join_key is not fixed length: `key_length` need to be written in rawData + // even the join keys is inlined, for example if join key is 2 binary string + // then the inlined join key should be: col1_size + col1_data + col2_size + col2_data + // and len(col1_size + col1_data + col2_size + col2_data) need to be written before the inlined join key + length := uint64(0) + if hasValidKey { + length = uint64(len(b.serializedKeyVectorBuffer[logicalRowIndex])) + } else { + length = 0 + } + seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfLengthField)...) + appendRowLength += sizeOfLengthField + } + // 2. fill serialized key if needed + if !rowTableMeta.isJoinKeysInlined { + // if join_key is not inlined: `serialized_key` need to be written in rawData + if hasValidKey { + seg.rawData = append(seg.rawData, b.serializedKeyVectorBuffer[logicalRowIndex]...) + appendRowLength += len(b.serializedKeyVectorBuffer[logicalRowIndex]) + } else { + // if there is no valid key, and the key is fixed length, then write a fake key + if rowTableMeta.isJoinKeysFixedLength { + seg.rawData = append(seg.rawData, rowTableMeta.fakeKeyByte...) + appendRowLength += rowTableMeta.joinKeysLength + } + // otherwise don't need to write since length is 0 + } + } + return appendRowLength +} + +func fillRowData(rowTableMeta *joinTableMeta, row *chunk.Row, seg *rowTableSegment) int { + appendRowLength := 0 + for index, colIdx := range rowTableMeta.rowColumnsOrder { + if rowTableMeta.columnsSize[index] > 0 { + // fixed size + seg.rawData = append(seg.rawData, row.GetRaw(colIdx)...) + appendRowLength += rowTableMeta.columnsSize[index] + } else { + // length, raw_data + raw := row.GetRaw(colIdx) + length := uint64(len(raw)) + seg.rawData = append(seg.rawData, unsafe.Slice((*byte)(unsafe.Pointer(&length)), sizeOfLengthField)...) + appendRowLength += sizeOfLengthField + seg.rawData = append(seg.rawData, raw...) + appendRowLength += int(length) + } + } + return appendRowLength +} + +func (b *rowTableBuilder) appendToRowTable(chk *chunk.Chunk, hashJoinCtx *HashJoinCtxV2, workerID int) error { + rowTableMeta := hashJoinCtx.hashTableMeta + for logicalRowIndex, physicalRowIndex := range b.usedRows { + if logicalRowIndex%10 == 0 || logicalRowIndex == len(b.usedRows)-1 { + err := checkSQLKiller(&hashJoinCtx.SessCtx.GetSessionVars().SQLKiller, "killedDuringBuild") + if err != nil { + return err + } + } + hasValidKey := (!b.hasFilter || b.filterVector[physicalRowIndex]) && (!b.hasNullableKey || !b.nullKeyVector[physicalRowIndex]) + if !hasValidKey && !b.keepFilteredRows { + continue + } + // need append the row to rowTable + var ( + row = chk.GetRow(logicalRowIndex) + partIdx = b.partIdxVector[logicalRowIndex] + seg *rowTableSegment + ) + seg = hashJoinCtx.hashTableContext.getCurrentRowSegment(workerID, partIdx, hashJoinCtx.hashTableMeta, true, b.firstSegRowSizeHint) + // first check if current seg is full + if b.rowNumberInCurrentRowTableSeg[partIdx] >= maxRowTableSegmentSize || len(seg.rawData) >= maxRowTableSegmentByteSize { + // finalize current seg and create a new seg + hashJoinCtx.hashTableContext.finalizeCurrentSeg(workerID, partIdx, b) + seg = hashJoinCtx.hashTableContext.getCurrentRowSegment(workerID, partIdx, hashJoinCtx.hashTableMeta, true, b.firstSegRowSizeHint) + } + if hasValidKey { + seg.validJoinKeyPos = append(seg.validJoinKeyPos, len(seg.hashValues)) + } + seg.hashValues = append(seg.hashValues, b.hashValue[logicalRowIndex]) + seg.rowStartOffset = append(seg.rowStartOffset, uint64(len(seg.rawData))) + rowLength := 0 + // fill next_row_ptr field + rowLength += fillNextRowPtr(seg) + // fill null_map + rowLength += fillNullMap(rowTableMeta, &row, seg) + // fill serialized key and key length if needed + rowLength += b.fillSerializedKeyAndKeyLengthIfNeeded(rowTableMeta, hasValidKey, logicalRowIndex, seg) + // fill row data + rowLength += fillRowData(rowTableMeta, &row, seg) + // to make sure rowLength is 8 bit alignment + if rowLength%8 != 0 { + seg.rawData = append(seg.rawData, fakeAddrPlaceHolder[:8-rowLength%8]...) + } + b.rowNumberInCurrentRowTableSeg[partIdx]++ + } + return nil +}