Skip to content

Commit

Permalink
executor: add DataInDiskByChunks to spill and restore data in chunks (
Browse files Browse the repository at this point in the history
  • Loading branch information
xzhangxian1008 authored Nov 30, 2023
1 parent 5850546 commit d1e87da
Show file tree
Hide file tree
Showing 10 changed files with 548 additions and 96 deletions.
32 changes: 16 additions & 16 deletions pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ type HashAggExec struct {

stats *HashAggRuntimeStats

// listInDisk is the chunks to store row values for spilled data.
// dataInDisk is the chunks to store row values for spilled data.
// The HashAggExec may be set to `spill mode` multiple times, and all spilled data will be appended to DataInDiskByRows.
listInDisk *chunk.DataInDiskByRows
dataInDisk *chunk.DataInDiskByChunks
// numOfSpilledChks indicates the number of all the spilled chunks.
numOfSpilledChks int
// offsetOfSpilledChks indicates the offset of the chunk be read from the disk.
Expand All @@ -152,21 +152,21 @@ func (e *HashAggExec) Close() error {
defer e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), e.stats)
}
if e.IsUnparallelExec {
var firstErr error
e.childResult = nil
e.groupSet, _ = set.NewStringSetWithMemoryUsage()
e.partialResultMap = nil
if e.memTracker != nil {
e.memTracker.ReplaceBytesUsed(0)
}
if e.listInDisk != nil {
firstErr = e.listInDisk.Close()
if e.dataInDisk != nil {
e.dataInDisk.Close()
}
e.spillAction, e.tmpChkForSpill = nil, nil
if err := e.BaseExecutor.Close(); firstErr == nil {
firstErr = err
err := e.BaseExecutor.Close()
if err != nil {
return err
}
return firstErr
return nil
}
if e.parallelExecValid {
// `Close` may be called after `Open` without calling `Next` in test.
Expand Down Expand Up @@ -240,13 +240,13 @@ func (e *HashAggExec) initForUnparallelExec() {

e.offsetOfSpilledChks, e.numOfSpilledChks = 0, 0
e.executed, e.isChildDrained = false, false
e.listInDisk = chunk.NewDataInDiskByRows(exec.RetTypes(e.Children(0)))
e.dataInDisk = chunk.NewDataInDiskByChunks(exec.RetTypes(e.Children(0)))

e.tmpChkForSpill = exec.TryNewCacheChunk(e.Children(0))
if vars := e.Ctx().GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
e.diskTracker = disk.NewTracker(e.ID(), -1)
e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker)
e.listInDisk.GetDiskTracker().AttachTo(e.diskTracker)
e.dataInDisk.GetDiskTracker().AttachTo(e.diskTracker)
vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
}
}
Expand Down Expand Up @@ -553,8 +553,8 @@ func (e *HashAggExec) resetSpillMode() {
e.partialResultMap = make(AggPartialResultMapper)
e.bInMap = 0
e.prepared = false
e.executed = e.numOfSpilledChks == e.listInDisk.NumChunks() // No data is spilling again, all data have been processed.
e.numOfSpilledChks = e.listInDisk.NumChunks()
e.executed = e.numOfSpilledChks == e.dataInDisk.NumChunks() // No data is spilling again, all data have been processed.
e.numOfSpilledChks = e.dataInDisk.NumChunks()
e.memTracker.ReplaceBytesUsed(setSize)
atomic.StoreUint32(&e.inSpillMode, 0)
}
Expand All @@ -563,7 +563,7 @@ func (e *HashAggExec) resetSpillMode() {
func (e *HashAggExec) execute(ctx context.Context) (err error) {
defer func() {
if e.tmpChkForSpill.NumRows() > 0 && err == nil {
err = e.listInDisk.Add(e.tmpChkForSpill)
err = e.dataInDisk.Add(e.tmpChkForSpill)
e.tmpChkForSpill.Reset()
}
}()
Expand Down Expand Up @@ -633,12 +633,12 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) {

func (e *HashAggExec) spillUnprocessedData(isFullChk bool) (err error) {
if isFullChk {
return e.listInDisk.Add(e.childResult)
return e.dataInDisk.Add(e.childResult)
}
for i := 0; i < e.childResult.NumRows(); i++ {
e.tmpChkForSpill.AppendRow(e.childResult.GetRow(i))
if e.tmpChkForSpill.IsFull() {
err = e.listInDisk.Add(e.tmpChkForSpill)
err = e.dataInDisk.Add(e.tmpChkForSpill)
if err != nil {
return err
}
Expand All @@ -660,7 +660,7 @@ func (e *HashAggExec) getNextChunk(ctx context.Context) (err error) {
e.isChildDrained = true
}
if e.offsetOfSpilledChks < e.numOfSpilledChks {
e.childResult, err = e.listInDisk.GetChunk(e.offsetOfSpilledChks)
e.childResult, err = e.dataInDisk.GetChunk(e.offsetOfSpilledChks)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/chunk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"alloc.go",
"chunk.go",
"chunk_in_disk.go",
"chunk_util.go",
"codec.go",
"column.go",
Expand Down Expand Up @@ -43,6 +44,7 @@ go_test(
timeout = "short",
srcs = [
"alloc_test.go",
"chunk_in_disk_test.go",
"chunk_test.go",
"chunk_util_test.go",
"codec_test.go",
Expand Down
12 changes: 12 additions & 0 deletions pkg/util/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@ const (
ZeroCapacity = 0
)

// NewEmptyChunk creates an empty chunk
func NewEmptyChunk(fields []*types.FieldType) *Chunk {
chk := &Chunk{
columns: make([]*Column, 0, len(fields)),
}

for _, f := range fields {
chk.columns = append(chk.columns, NewEmptyColumn(f))
}
return chk
}

// NewChunkWithCapacity creates a new chunk with field types and capacity.
func NewChunkWithCapacity(fields []*types.FieldType, capacity int) *Chunk {
return New(fields, capacity, capacity)
Expand Down
Loading

0 comments on commit d1e87da

Please sign in to comment.