diff --git a/pkg/executor/aggregate/agg_hash_executor.go b/pkg/executor/aggregate/agg_hash_executor.go index a3bc512ece2fb..7cf3608ab40ae 100644 --- a/pkg/executor/aggregate/agg_hash_executor.go +++ b/pkg/executor/aggregate/agg_hash_executor.go @@ -126,9 +126,9 @@ type HashAggExec struct { stats *HashAggRuntimeStats - // listInDisk is the chunks to store row values for spilled data. + // dataInDisk is the chunks to store row values for spilled data. // The HashAggExec may be set to `spill mode` multiple times, and all spilled data will be appended to DataInDiskByRows. - listInDisk *chunk.DataInDiskByRows + dataInDisk *chunk.DataInDiskByChunks // numOfSpilledChks indicates the number of all the spilled chunks. numOfSpilledChks int // offsetOfSpilledChks indicates the offset of the chunk be read from the disk. @@ -152,21 +152,21 @@ func (e *HashAggExec) Close() error { defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats) } if e.IsUnparallelExec { - var firstErr error e.childResult = nil e.groupSet, _ = set.NewStringSetWithMemoryUsage() e.partialResultMap = nil if e.memTracker != nil { e.memTracker.ReplaceBytesUsed(0) } - if e.listInDisk != nil { - firstErr = e.listInDisk.Close() + if e.dataInDisk != nil { + e.dataInDisk.Close() } e.spillAction, e.tmpChkForSpill = nil, nil - if err := e.BaseExecutor.Close(); firstErr == nil { - firstErr = err + err := e.BaseExecutor.Close() + if err != nil { + return err } - return firstErr + return nil } if e.parallelExecValid { // `Close` may be called after `Open` without calling `Next` in test. @@ -240,13 +240,13 @@ func (e *HashAggExec) initForUnparallelExec() { e.offsetOfSpilledChks, e.numOfSpilledChks = 0, 0 e.executed, e.isChildDrained = false, false - e.listInDisk = chunk.NewDataInDiskByRows(exec.RetTypes(e.Children(0))) + e.dataInDisk = chunk.NewDataInDiskByChunks(exec.RetTypes(e.Children(0))) e.tmpChkForSpill = exec.TryNewCacheChunk(e.Children(0)) if vars := e.Ctx().GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() { e.diskTracker = disk.NewTracker(e.ID(), -1) e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker) - e.listInDisk.GetDiskTracker().AttachTo(e.diskTracker) + e.dataInDisk.GetDiskTracker().AttachTo(e.diskTracker) vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill()) } } @@ -553,8 +553,8 @@ func (e *HashAggExec) resetSpillMode() { e.partialResultMap = make(AggPartialResultMapper) e.bInMap = 0 e.prepared = false - e.executed = e.numOfSpilledChks == e.listInDisk.NumChunks() // No data is spilling again, all data have been processed. - e.numOfSpilledChks = e.listInDisk.NumChunks() + e.executed = e.numOfSpilledChks == e.dataInDisk.NumChunks() // No data is spilling again, all data have been processed. + e.numOfSpilledChks = e.dataInDisk.NumChunks() e.memTracker.ReplaceBytesUsed(setSize) atomic.StoreUint32(&e.inSpillMode, 0) } @@ -563,7 +563,7 @@ func (e *HashAggExec) resetSpillMode() { func (e *HashAggExec) execute(ctx context.Context) (err error) { defer func() { if e.tmpChkForSpill.NumRows() > 0 && err == nil { - err = e.listInDisk.Add(e.tmpChkForSpill) + err = e.dataInDisk.Add(e.tmpChkForSpill) e.tmpChkForSpill.Reset() } }() @@ -633,12 +633,12 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) { func (e *HashAggExec) spillUnprocessedData(isFullChk bool) (err error) { if isFullChk { - return e.listInDisk.Add(e.childResult) + return e.dataInDisk.Add(e.childResult) } for i := 0; i < e.childResult.NumRows(); i++ { e.tmpChkForSpill.AppendRow(e.childResult.GetRow(i)) if e.tmpChkForSpill.IsFull() { - err = e.listInDisk.Add(e.tmpChkForSpill) + err = e.dataInDisk.Add(e.tmpChkForSpill) if err != nil { return err } @@ -660,7 +660,7 @@ func (e *HashAggExec) getNextChunk(ctx context.Context) (err error) { e.isChildDrained = true } if e.offsetOfSpilledChks < e.numOfSpilledChks { - e.childResult, err = e.listInDisk.GetChunk(e.offsetOfSpilledChks) + e.childResult, err = e.dataInDisk.GetChunk(e.offsetOfSpilledChks) if err != nil { return err } diff --git a/pkg/util/chunk/BUILD.bazel b/pkg/util/chunk/BUILD.bazel index 8aa1fa4f47f36..8f1afb80d6a49 100644 --- a/pkg/util/chunk/BUILD.bazel +++ b/pkg/util/chunk/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "alloc.go", "chunk.go", + "chunk_in_disk.go", "chunk_util.go", "codec.go", "column.go", @@ -43,6 +44,7 @@ go_test( timeout = "short", srcs = [ "alloc_test.go", + "chunk_in_disk_test.go", "chunk_test.go", "chunk_util_test.go", "codec_test.go", diff --git a/pkg/util/chunk/chunk.go b/pkg/util/chunk/chunk.go index 6242de8450633..5fd8a1ecdd7f5 100644 --- a/pkg/util/chunk/chunk.go +++ b/pkg/util/chunk/chunk.go @@ -54,6 +54,18 @@ const ( ZeroCapacity = 0 ) +// NewEmptyChunk creates an empty chunk +func NewEmptyChunk(fields []*types.FieldType) *Chunk { + chk := &Chunk{ + columns: make([]*Column, 0, len(fields)), + } + + for _, f := range fields { + chk.columns = append(chk.columns, NewEmptyColumn(f)) + } + return chk +} + // NewChunkWithCapacity creates a new chunk with field types and capacity. func NewChunkWithCapacity(fields []*types.FieldType, capacity int) *Chunk { return New(fields, capacity, capacity) diff --git a/pkg/util/chunk/chunk_in_disk.go b/pkg/util/chunk/chunk_in_disk.go new file mode 100644 index 0000000000000..f648f2c1bf453 --- /dev/null +++ b/pkg/util/chunk/chunk_in_disk.go @@ -0,0 +1,317 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +import ( + "io" + "os" + "strconv" + "unsafe" + + errors2 "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/disk" + "github.com/pingcap/tidb/pkg/util/memory" +) + +const byteLen = int64(unsafe.Sizeof(byte(0))) +const intLen = int64(unsafe.Sizeof(int(0))) +const int64Len = int64(unsafe.Sizeof(int64(0))) + +const chkFixedSize = intLen * 4 +const colMetaSize = int64Len * 4 + +const defaultChunkDataInDiskByChunksPath = "defaultChunkDataInDiskByChunksPath" + +// DataInDiskByChunks represents some data stored in temporary disk. +// They can only be restored by chunks. +type DataInDiskByChunks struct { + fieldTypes []*types.FieldType + offsetOfEachChunk []int64 + + totalDataSize int64 + totalRowNum int64 + diskTracker *disk.Tracker // track disk usage. + + dataFile diskFileReaderWriter + + // Write or read data needs this buffer to temporarily store data + buf []byte +} + +// NewDataInDiskByChunks creates a new DataInDiskByChunks with field types. +func NewDataInDiskByChunks(fieldTypes []*types.FieldType) *DataInDiskByChunks { + d := &DataInDiskByChunks{ + fieldTypes: fieldTypes, + totalDataSize: 0, + totalRowNum: 0, + // TODO: set the quota of disk usage. + diskTracker: disk.NewTracker(memory.LabelForChunkDataInDiskByChunks, -1), + buf: make([]byte, 0, 4096), + } + return d +} + +func (d *DataInDiskByChunks) initDiskFile() (err error) { + err = disk.CheckAndInitTempDir() + if err != nil { + return + } + err = d.dataFile.initWithFileName(defaultChunkDataInDiskByChunksPath + strconv.Itoa(d.diskTracker.Label())) + return +} + +// GetDiskTracker returns the memory tracker of this List. +func (d *DataInDiskByChunks) GetDiskTracker() *disk.Tracker { + return d.diskTracker +} + +// Add adds a chunk to the DataInDiskByChunks. Caller must make sure the input chk has the same field types. +// Warning: Do not concurrently call this function. +func (d *DataInDiskByChunks) Add(chk *Chunk) (err error) { + if chk.NumRows() == 0 { + return errors2.New("Chunk spilled to disk should have at least 1 row") + } + + if d.dataFile.file == nil { + err = d.initDiskFile() + if err != nil { + return + } + } + + serializedBytesNum := d.serializeDataToBuf(chk) + + var writeNum int + writeNum, err = d.dataFile.write(d.buf) + if err != nil { + return + } + + if int64(writeNum) != serializedBytesNum { + return errors2.New("Some data fail to be spilled to disk") + } + d.offsetOfEachChunk = append(d.offsetOfEachChunk, d.totalDataSize) + d.totalDataSize += serializedBytesNum + d.totalRowNum += int64(chk.NumRows()) + d.dataFile.offWrite += serializedBytesNum + + d.diskTracker.Consume(serializedBytesNum) + return +} + +func (d *DataInDiskByChunks) getChunkSize(chkIdx int) int64 { + totalChunkNum := len(d.offsetOfEachChunk) + if chkIdx == totalChunkNum-1 { + return d.totalDataSize - d.offsetOfEachChunk[chkIdx] + } + return d.offsetOfEachChunk[chkIdx+1] - d.offsetOfEachChunk[chkIdx] +} + +// GetChunk gets a Chunk from the DataInDiskByChunks by chkIdx. +func (d *DataInDiskByChunks) GetChunk(chkIdx int) (*Chunk, error) { + reader := d.dataFile.getSectionReader(d.offsetOfEachChunk[chkIdx]) + chkSize := d.getChunkSize(chkIdx) + + if cap(d.buf) < int(chkSize) { + d.buf = make([]byte, chkSize) + } else { + d.buf = d.buf[:chkSize] + } + + readByteNum, err := io.ReadFull(reader, d.buf) + if err != nil { + return nil, err + } + + if int64(readByteNum) != chkSize { + return nil, errors2.New("Fail to restore the spilled chunk") + } + + chk := NewEmptyChunk(d.fieldTypes) + d.deserializeDataToChunk(chk) + + return chk, nil +} + +// Close releases the disk resource. +func (d *DataInDiskByChunks) Close() { + if d.dataFile.file != nil { + d.diskTracker.Consume(-d.diskTracker.BytesConsumed()) + terror.Call(d.dataFile.file.Close) + terror.Log(os.Remove(d.dataFile.file.Name())) + } +} + +func (d *DataInDiskByChunks) serializeColMeta(pos int64, length int64, nullMapSize int64, dataSize int64, offsetSize int64) { + *(*int64)(unsafe.Pointer(&d.buf[pos])) = length + *(*int64)(unsafe.Pointer(&d.buf[pos+int64Len])) = nullMapSize + *(*int64)(unsafe.Pointer(&d.buf[pos+int64Len*2])) = dataSize + *(*int64)(unsafe.Pointer(&d.buf[pos+int64Len*3])) = offsetSize +} + +func (d *DataInDiskByChunks) serializeOffset(pos *int64, offsets []int64, offsetSize int64) { + d.buf = d.buf[:*pos+offsetSize] + for _, offset := range offsets { + *(*int64)(unsafe.Pointer(&d.buf[*pos])) = offset + *pos += int64Len + } +} + +func (d *DataInDiskByChunks) serializeChunkData(pos *int64, chk *Chunk, selSize int64) { + d.buf = d.buf[:chkFixedSize] + *(*int)(unsafe.Pointer(&d.buf[*pos])) = chk.numVirtualRows + *(*int)(unsafe.Pointer(&d.buf[*pos+intLen])) = chk.capacity + *(*int)(unsafe.Pointer(&d.buf[*pos+intLen*2])) = chk.requiredRows + *(*int)(unsafe.Pointer(&d.buf[*pos+intLen*3])) = int(selSize) + *pos += chkFixedSize + + d.buf = d.buf[:*pos+selSize] + + selLen := len(chk.sel) + for i := 0; i < selLen; i++ { + *(*int)(unsafe.Pointer(&d.buf[*pos])) = chk.sel[i] + *pos += intLen + } +} + +func (d *DataInDiskByChunks) serializeColumns(pos *int64, chk *Chunk) { + for _, col := range chk.columns { + d.buf = d.buf[:*pos+colMetaSize] + nullMapSize := int64(len(col.nullBitmap)) * byteLen + dataSize := int64(len(col.data)) * byteLen + offsetSize := int64(len(col.offsets)) * int64Len + d.serializeColMeta(*pos, int64(col.length), nullMapSize, dataSize, offsetSize) + *pos += colMetaSize + + d.buf = append(d.buf, col.nullBitmap...) + d.buf = append(d.buf, col.data...) + *pos += nullMapSize + dataSize + d.serializeOffset(pos, col.offsets, offsetSize) + } +} + +// Serialized format of a chunk: +// chunk data: | numVirtualRows | capacity | requiredRows | selSize | sel... | +// column1 data: | length | nullMapSize | dataSize | offsetSize | nullBitmap... | data... | offsets... | +// column2 data: | length | nullMapSize | dataSize | offsetSize | nullBitmap... | data... | offsets... | +// ... +// columnN data: | length | nullMapSize | dataSize | offsetSize | nullBitmap... | data... | offsets... | +// +// `xxx...` means this is a variable field filled by bytes. +func (d *DataInDiskByChunks) serializeDataToBuf(chk *Chunk) int64 { + totalBytes := int64(0) + + // Calculate total memory that buffer needs + selSize := int64(len(chk.sel)) * intLen + totalBytes += chkFixedSize + selSize + for _, col := range chk.columns { + nullMapSize := int64(len(col.nullBitmap)) * byteLen + dataSize := int64(len(col.data)) * byteLen + offsetSize := int64(len(col.offsets)) * int64Len + totalBytes += colMetaSize + nullMapSize + dataSize + offsetSize + } + + if cap(d.buf) < int(totalBytes) { + d.buf = make([]byte, 0, totalBytes) + } + + pos := int64(0) + d.serializeChunkData(&pos, chk, selSize) + d.serializeColumns(&pos, chk) + return totalBytes +} + +func (d *DataInDiskByChunks) deserializeColMeta(pos *int64) (length int64, nullMapSize int64, dataSize int64, offsetSize int64) { + length = *(*int64)(unsafe.Pointer(&d.buf[*pos])) + *pos += int64Len + + nullMapSize = *(*int64)(unsafe.Pointer(&d.buf[*pos])) + *pos += int64Len + + dataSize = *(*int64)(unsafe.Pointer(&d.buf[*pos])) + *pos += int64Len + + offsetSize = *(*int64)(unsafe.Pointer(&d.buf[*pos])) + *pos += int64Len + return +} + +func (d *DataInDiskByChunks) deserializeSel(chk *Chunk, pos *int64, selSize int) { + selLen := int64(selSize) / intLen + chk.sel = make([]int, selLen) + for i := int64(0); i < selLen; i++ { + chk.sel[i] = *(*int)(unsafe.Pointer(&d.buf[*pos])) + *pos += intLen + } +} + +func (d *DataInDiskByChunks) deserializeChunkData(chk *Chunk, pos *int64) { + chk.numVirtualRows = *(*int)(unsafe.Pointer(&d.buf[*pos])) + *pos += intLen + + chk.capacity = *(*int)(unsafe.Pointer(&d.buf[*pos])) + *pos += intLen + + chk.requiredRows = *(*int)(unsafe.Pointer(&d.buf[*pos])) + *pos += intLen + + selSize := *(*int)(unsafe.Pointer(&d.buf[*pos])) + *pos += intLen + if selSize != 0 { + d.deserializeSel(chk, pos, selSize) + } +} + +func (d *DataInDiskByChunks) deserializeOffsets(dst []int64, pos *int64) { + offsetNum := len(dst) + for i := 0; i < offsetNum; i++ { + dst[i] = *(*int64)(unsafe.Pointer(&d.buf[*pos])) + *pos += int64Len + } +} + +func (d *DataInDiskByChunks) deserializeColumns(chk *Chunk, pos *int64) { + for _, col := range chk.columns { + length, nullMapSize, dataSize, offsetSize := d.deserializeColMeta(pos) + col.nullBitmap = make([]byte, nullMapSize) + col.data = make([]byte, dataSize) + col.offsets = make([]int64, offsetSize/int64Len) + + col.length = int(length) + copy(col.nullBitmap, d.buf[*pos:*pos+nullMapSize]) + *pos += nullMapSize + copy(col.data, d.buf[*pos:*pos+dataSize]) + *pos += dataSize + d.deserializeOffsets(col.offsets, pos) + } +} + +func (d *DataInDiskByChunks) deserializeDataToChunk(chk *Chunk) { + pos := int64(0) + d.deserializeChunkData(chk, &pos) + d.deserializeColumns(chk, &pos) +} + +// NumRows returns total spilled row number +func (d *DataInDiskByChunks) NumRows() int64 { + return d.totalRowNum +} + +// NumChunks returns total spilled chunk number +func (d *DataInDiskByChunks) NumChunks() int { + return len(d.offsetOfEachChunk) +} diff --git a/pkg/util/chunk/chunk_in_disk_test.go b/pkg/util/chunk/chunk_in_disk_test.go new file mode 100644 index 0000000000000..ce23c233ac0da --- /dev/null +++ b/pkg/util/chunk/chunk_in_disk_test.go @@ -0,0 +1,86 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package chunk + +import ( + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func stripAuxDataForChunk(chk *Chunk) { + chk.capacity = 0 + chk.requiredRows = 0 + chk.numVirtualRows = 0 + chk.sel = nil +} + +func addAuxDataForChunks(chunks []*Chunk) { + for _, chk := range chunks { + chk.capacity = rand.Intn(100) + chk.requiredRows = rand.Intn(100) + chk.numVirtualRows = rand.Intn(100) + + selLen := rand.Intn(50) + 1 + chk.sel = make([]int, selLen) + for i := 0; i < selLen; i++ { + chk.sel[i] = rand.Int() + } + } +} + +func checkAuxDataForChunk(t *testing.T, chk1, chk2 *Chunk) { + require.Equal(t, chk1.capacity, chk2.capacity) + require.Equal(t, chk1.requiredRows, chk2.requiredRows) + require.Equal(t, chk1.numVirtualRows, chk2.numVirtualRows) + require.Equal(t, len(chk1.sel), len(chk2.sel)) + + length := len(chk1.sel) + for i := 0; i < length; i++ { + require.Equal(t, chk1.sel[i], chk2.sel[i]) + } +} + +func checkChunk(t *testing.T, chk1, chk2 *Chunk) { + checkAuxDataForChunk(t, chk1, chk2) + stripAuxDataForChunk(chk1) + stripAuxDataForChunk(chk2) + + require.Equal(t, chk1.NumRows(), chk2.NumRows()) + numRows := chk1.NumRows() + for i := 0; i < numRows; i++ { + checkRow(t, chk1.GetRow(i), chk2.GetRow(i)) + } +} + +func TestDataInDiskByChunks(t *testing.T) { + numChk, numRow := 100, 1000 + chks, fields := initChunks(numChk, numRow) + addAuxDataForChunks(chks) + dataInDiskByChunks := NewDataInDiskByChunks(fields) + defer dataInDiskByChunks.Close() + + for _, chk := range chks { + err := dataInDiskByChunks.Add(chk) + require.NoError(t, err) + } + + for i := 0; i < numChk; i++ { + chk, err := dataInDiskByChunks.GetChunk(i) + require.NoError(t, err) + checkChunk(t, chk, chks[i]) + } +} diff --git a/pkg/util/chunk/chunk_util.go b/pkg/util/chunk/chunk_util.go index 5e96d73e71fe9..b0157f2b2bdc7 100644 --- a/pkg/util/chunk/chunk_util.go +++ b/pkg/util/chunk/chunk_util.go @@ -14,7 +14,15 @@ package chunk -import "github.com/pingcap/errors" +import ( + "io" + "os" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/util/checksum" + "github.com/pingcap/tidb/pkg/util/encrypt" +) // CopySelectedJoinRowsDirect directly copies the selected joined rows from the source Chunk // to the destination Chunk. @@ -165,3 +173,67 @@ func copySameOuterRows(outerColOffset, outerColLen int, src *Chunk, numRows int, } } } + +// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file. +type diskFileReaderWriter struct { + file *os.File + writer io.WriteCloser + + // offWrite is the current offset for writing. + offWrite int64 + + checksumWriter *checksum.Writer + cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr" + + // ctrCipher stores the key and nonce using by aes encrypt io layer + ctrCipher *encrypt.CtrCipher +} + +func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) { + // `os.CreateTemp` will insert random string so that a random file name will be generated. + l.file, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName) + if err != nil { + return errors.Trace(err) + } + var underlying io.WriteCloser = l.file + if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { + // The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" + l.ctrCipher, err = encrypt.NewCtrCipher() + if err != nil { + return + } + l.cipherWriter = encrypt.NewWriter(l.file, l.ctrCipher) + underlying = l.cipherWriter + } + l.checksumWriter = checksum.NewWriter(underlying) + l.writer = l.checksumWriter + l.offWrite = 0 + return +} + +func (l *diskFileReaderWriter) getReader() io.ReaderAt { + var underlying io.ReaderAt = l.file + if l.ctrCipher != nil { + underlying = NewReaderWithCache(encrypt.NewReader(l.file, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) + } + if l.checksumWriter != nil { + underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + } + return underlying +} + +func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader { + checksumReader := l.getReader() + r := io.NewSectionReader(checksumReader, off, l.offWrite-off) + return r +} + +func (l *diskFileReaderWriter) getWriter() io.Writer { + return l.writer +} + +func (l *diskFileReaderWriter) write(writeData []byte) (n int, err error) { + writeNum, err := l.writer.Write(writeData) + l.offWrite += int64(writeNum) + return writeNum, err +} diff --git a/pkg/util/chunk/column.go b/pkg/util/chunk/column.go index e83d939669690..2e423e1fb9a8f 100644 --- a/pkg/util/chunk/column.go +++ b/pkg/util/chunk/column.go @@ -82,6 +82,16 @@ func (DefaultColumnAllocator) NewColumn(ft *types.FieldType, capacity int) *Colu return newColumn(getFixedLen(ft), capacity) } +// NewEmptyColumn creates a new column with nothing. +func NewEmptyColumn(ft *types.FieldType) *Column { + elemLen := getFixedLen(ft) + col := Column{} + if elemLen != varElemLen { + col.elemBuf = make([]byte, elemLen) + } + return &col +} + // NewColumn creates a new column with the specific type and capacity. func NewColumn(ft *types.FieldType, capacity int) *Column { return newColumn(getFixedLen(ft), capacity) diff --git a/pkg/util/chunk/row_in_disk.go b/pkg/util/chunk/row_in_disk.go index 3fb7d09878d25..77c378eacc790 100644 --- a/pkg/util/chunk/row_in_disk.go +++ b/pkg/util/chunk/row_in_disk.go @@ -21,12 +21,9 @@ import ( "strconv" errors2 "github.com/pingcap/errors" - "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/checksum" "github.com/pingcap/tidb/pkg/util/disk" - "github.com/pingcap/tidb/pkg/util/encrypt" "github.com/pingcap/tidb/pkg/util/memory" ) @@ -43,61 +40,6 @@ type DataInDiskByRows struct { offsetFile diskFileReaderWriter } -// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file. -type diskFileReaderWriter struct { - disk *os.File - w io.WriteCloser - // offWrite is the current offset for writing. - offWrite int64 - - checksumWriter *checksum.Writer - cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr" - - // ctrCipher stores the key and nonce using by aes encrypt io layer - ctrCipher *encrypt.CtrCipher -} - -func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) { - l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName) - if err != nil { - return errors2.Trace(err) - } - var underlying io.WriteCloser = l.disk - if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { - // The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" - l.ctrCipher, err = encrypt.NewCtrCipher() - if err != nil { - return - } - l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) - underlying = l.cipherWriter - } - l.checksumWriter = checksum.NewWriter(underlying) - l.w = l.checksumWriter - return -} - -func (l *diskFileReaderWriter) getReader() io.ReaderAt { - var underlying io.ReaderAt = l.disk - if l.ctrCipher != nil { - underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) - } - if l.checksumWriter != nil { - underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) - } - return underlying -} - -func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader { - checksumReader := l.getReader() - r := io.NewSectionReader(checksumReader, off, l.offWrite-off) - return r -} - -func (l *diskFileReaderWriter) getWriter() io.Writer { - return l.w -} - var defaultChunkDataInDiskByRowsPath = "chunk.DataInDiskByRows" var defaultChunkDataInDiskByRowsOffsetPath = "chunk.DataInDiskByRowsOffset" @@ -141,7 +83,7 @@ func (l *DataInDiskByRows) Add(chk *Chunk) (err error) { if chk.NumRows() == 0 { return errors2.New("chunk appended to List should have at least 1 row") } - if l.dataFile.disk == nil { + if l.dataFile.file == nil { err = l.initDiskFile() if err != nil { return @@ -255,14 +197,14 @@ func (l *DataInDiskByRows) NumChunks() int { // Close releases the disk resource. func (l *DataInDiskByRows) Close() error { - if l.dataFile.disk != nil { + if l.dataFile.file != nil { l.diskTracker.Consume(-l.diskTracker.BytesConsumed()) - terror.Call(l.dataFile.disk.Close) - terror.Log(os.Remove(l.dataFile.disk.Name())) + terror.Call(l.dataFile.file.Close) + terror.Log(os.Remove(l.dataFile.file.Name())) } - if l.offsetFile.disk != nil { - terror.Call(l.offsetFile.disk.Close) - terror.Log(os.Remove(l.offsetFile.disk.Name())) + if l.offsetFile.file != nil { + terror.Call(l.offsetFile.file.Close) + terror.Log(os.Remove(l.offsetFile.file.Name())) } return nil } diff --git a/pkg/util/chunk/row_in_disk_test.go b/pkg/util/chunk/row_in_disk_test.go index 453b32be292f8..c463beb03b63d 100644 --- a/pkg/util/chunk/row_in_disk_test.go +++ b/pkg/util/chunk/row_in_disk_test.go @@ -16,7 +16,6 @@ package chunk import ( "bytes" - "fmt" "io" "math/rand" "os" @@ -34,6 +33,16 @@ import ( "github.com/stretchr/testify/require" ) +func genString() string { + retStr := "西xi瓜gua" + factor := rand.Intn(5) + for i := 0; i < factor; i++ { + retStr += retStr + } + + return retStr +} + func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType) { fields := []*types.FieldType{ types.NewFieldType(mysql.TypeVarString), @@ -48,12 +57,12 @@ func initChunks(numChk, numRow int) ([]*Chunk, []*types.FieldType) { chk := NewChunkWithCapacity(fields, numRow) for rowIdx := 0; rowIdx < numRow; rowIdx++ { data := int64(chkIdx*numRow + rowIdx) - chk.AppendString(0, fmt.Sprint(data)) + chk.AppendString(0, genString()) chk.AppendNull(1) chk.AppendNull(2) chk.AppendInt64(3, data) if chkIdx%2 == 0 { - chk.AppendJSON(4, types.CreateBinaryJSON(fmt.Sprint(data))) + chk.AppendJSON(4, types.CreateBinaryJSON(genString())) } else { chk.AppendNull(4) } @@ -70,15 +79,15 @@ func TestDataInDiskByRows(t *testing.T) { defer func() { err := l.Close() require.NoError(t, err) - require.NotNil(t, l.dataFile.disk) - _, err = os.Stat(l.dataFile.disk.Name()) + require.NotNil(t, l.dataFile.file) + _, err = os.Stat(l.dataFile.file.Name()) require.True(t, os.IsNotExist(err)) }() for _, chk := range chks { err := l.Add(chk) assert.NoError(t, err) } - require.True(t, strings.HasPrefix(l.dataFile.disk.Name(), filepath.Join(os.TempDir(), "tidb_enable_tmp_storage_on_oom"))) + require.True(t, strings.HasPrefix(l.dataFile.file.Name(), filepath.Join(os.TempDir(), "tidb_enable_tmp_storage_on_oom"))) assert.Equal(t, numChk, l.NumChunks()) assert.Greater(t, l.GetDiskTracker().BytesConsumed(), int64(0)) @@ -143,14 +152,14 @@ type listInDiskWriteDisk struct { } func (l *diskFileReaderWriter) flushForTest() (err error) { - err = l.disk.Close() + err = l.file.Close() if err != nil { return } - l.w = nil + l.writer = nil // the l.disk is the underlying object of the l.w, it will be closed // after calling l.w.Close, we need to reopen it before reading rows. - l.disk, err = os.Open(l.disk.Name()) + l.file, err = os.Open(l.file.Name()) if err != nil { return errors2.Trace(err) } @@ -163,15 +172,15 @@ func newDataInDiskByRowsWriteDisk(fieldTypes []*types.FieldType) (*listInDiskWri if err != nil { return nil, err } - l.dataFile.disk = disk - l.dataFile.w = disk + l.dataFile.file = disk + l.dataFile.writer = disk disk2, err := os.CreateTemp(config.GetGlobalConfig().TempStoragePath, "offset"+strconv.Itoa(l.diskTracker.Label())) if err != nil { return nil, err } - l.offsetFile.disk = disk2 - l.offsetFile.w = disk2 + l.offsetFile.file = disk2 + l.offsetFile.writer = disk2 return &l, nil } @@ -185,7 +194,7 @@ func (l *listInDiskWriteDisk) GetRow(ptr RowPtr) (row Row, err error) { return } - r := io.NewSectionReader(l.dataFile.disk, off, l.dataFile.offWrite-off) + r := io.NewSectionReader(l.dataFile.file, off, l.dataFile.offWrite-off) format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(r) if err != nil { diff --git a/pkg/util/memory/tracker.go b/pkg/util/memory/tracker.go index e60987066b667..bbcd7f73a4936 100644 --- a/pkg/util/memory/tracker.go +++ b/pkg/util/memory/tracker.go @@ -839,6 +839,8 @@ const ( LabelForMemDB int = -28 // LabelForCursorFetch represents the label of the execution of cursor fetch LabelForCursorFetch int = -29 + // LabelForChunkDataInDiskByChunks represents the label of the chunk list in disk + LabelForChunkDataInDiskByChunks int = -30 ) // MetricsTypes is used to get label for metrics