Skip to content

Commit

Permalink
executor: parallel read inner table and build hash table. (#7544)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored and zz-jason committed Sep 4, 2018
1 parent 418cdf2 commit ac8a61e
Showing 1 changed file with 50 additions and 22 deletions.
72 changes: 50 additions & 22 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,40 @@ func (e *HashJoinExec) wait4Inner() (finished bool, err error) {

// fetchInnerRows fetches all rows from inner executor,
// and append them to e.innerResult.
func (e *HashJoinExec) fetchInnerRows(ctx context.Context) (err error) {
func (e *HashJoinExec) fetchInnerRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh chan struct{}) {
defer func() {
close(chkCh)
if r := recover(); r != nil {
buf := make([]byte, 4096)
stackSize := runtime.Stack(buf, false)
buf = buf[:stackSize]
log.Errorf("hash join inner fetcher panic stack is:\n%s", buf)
}
}()
e.innerResult = chunk.NewList(e.innerExec.retTypes(), e.maxChunkSize)
e.innerResult.GetMemTracker().AttachTo(e.memTracker)
e.innerResult.GetMemTracker().SetLabel("innerResult")
var err error
for {
if e.finished.Load().(bool) {
return nil
}
chk := e.children[e.innerIdx].newChunk()
err = e.innerExec.Next(ctx, chk)
if err != nil || chk.NumRows() == 0 {
return errors.Trace(err)
select {
case <-doneCh:
return
default:
if e.finished.Load().(bool) {
return
}
chk := e.children[e.innerIdx].newChunk()
err = e.innerExec.Next(ctx, chk)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
}
if chk.NumRows() == 0 {
return
}
chkCh <- chk
e.innerResult.Add(chk)
}
e.innerResult.Add(chk)
}
}

Expand Down Expand Up @@ -512,26 +532,31 @@ func (e *HashJoinExec) fetchInnerAndBuildHashTable(ctx context.Context) {
}
close(e.innerFinished)
}()

if err := e.fetchInnerRows(ctx); err != nil {
e.innerFinished <- errors.Trace(err)
return
}
// innerResultCh transfer inner result chunk from inner fetch to build hash table.
innerResultCh := make(chan *chunk.Chunk, e.concurrency)
doneCh := make(chan struct{})
go e.fetchInnerRows(ctx, innerResultCh, doneCh)

if e.finished.Load().(bool) {
return
}

if err := e.buildHashTableForList(); err != nil {
// TODO: Parallel build hash table. Currently not support because `mvmap` is not thread-safe.
err := e.buildHashTableForList(innerResultCh)
if err != nil {
e.innerFinished <- errors.Trace(err)
return
close(doneCh)
// fetchInnerRows may be blocked by this channel, so read from the channel to unblock it.
select {
case <-innerResultCh:
default:
}
}
}

// buildHashTableForList builds hash table from `list`.
// key of hash table: hash value of key columns
// value of hash table: RowPtr of the corresponded row
func (e *HashJoinExec) buildHashTableForList() error {
func (e *HashJoinExec) buildHashTableForList(innerResultCh chan *chunk.Chunk) error {
e.hashTable = mvmap.NewMVMap()
e.innerKeyColIdx = make([]int, len(e.innerKeys))
for i := range e.innerKeys {
Expand All @@ -543,23 +568,26 @@ func (e *HashJoinExec) buildHashTableForList() error {
keyBuf = make([]byte, 0, 64)
valBuf = make([]byte, 8)
)
for i := 0; i < e.innerResult.NumChunks(); i++ {

chkIdx := uint32(0)
for chk := range innerResultCh {
if e.finished.Load().(bool) {
return nil
}
chk := e.innerResult.GetChunk(i)
for j := 0; j < chk.NumRows(); j++ {
numRows := chk.NumRows()
for j := 0; j < numRows; j++ {
hasNull, keyBuf, err = e.getJoinKeyFromChkRow(false, chk.GetRow(j), keyBuf)
if err != nil {
return errors.Trace(err)
}
if hasNull {
continue
}
rowPtr := chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(j)}
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(j)}
*(*chunk.RowPtr)(unsafe.Pointer(&valBuf[0])) = rowPtr
e.hashTable.Put(keyBuf, valBuf)
}
chkIdx++
}
return nil
}
Expand Down

0 comments on commit ac8a61e

Please sign in to comment.